1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2014 Richard Andrew Cattermole
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 module dakka.base.remotes.defs;
25 import dakka.base.defs;
26 import dakka.base.registration.actors;
27 import dakka.base.remotes.messages : utc0Time;
28 import vibe.d : msecs, Task, send, sleep;
29 import cerealed.decerealizer;
30 
31 private shared {
32 	RemoteDirector director;
33 
34 	shared static this() {
35 		director = cast(shared)new RemoteDirector();
36 	}
37 }
38 
39 void setDirector(T : RemoteDirector)() {
40 	directory = cast(shared)new T();
41 }
42 
43 void setDirector(RemoteDirector dir) {
44 	director = cast(shared)dir;
45 }
46 
47 RemoteDirector getDirector() {
48 	synchronized {
49 		return cast()director;
50 	}
51 }
52 
53 class RemoteDirector {
54 	import dakka.base.remotes.messages : DirectorCommunicationActions;
55 	alias DCA = DirectorCommunicationActions;
56 
57 
58 	private shared {
59 		Task[string] remoteConnections; // ThreadID[remoteAddressIdentifier]
60 		RemoteClassIdentifier[string] remoteClasses; // ClassIdentifier[uniqueInstanceIdentifier]
61 		string[][string][string] remoteClassInstances; // ClassIdentifier[][ClassType][remoteAddressIdentifier]
62 		string[][string] nodeCapabilities; //capability[][remoteAddressIdentifier]
63 		string[][string] localClasses; // ClassInstanceIdentifier[][ClassIdentifier]
64 		bool[string] addrLagCheckUse; // ShouldUse[remoteAddressIdentifier]
65 		ubyte[][string] retData; //data[UniqueAccessIdentifier]
66 	}
67 
68 	bool validAddressIdentifier(string adder) {return (adder in remoteConnections) !is null;}
69 	bool validateBuildInfo(string) {return true;}
70 	bool validateClientCapabilities(string[]) {return true;}
71 
72 	string[] allRemoteAddresses() { return cast()remoteConnections.keys; }
73 
74 	void assign(string addr, Task task) {
75 		remoteConnections[addr] = cast(shared)task;
76 		addrLagCheckUse[addr] = true;
77 	}
78 
79 	void unassign(string addr) {
80 		remoteConnections.remove(addr);
81 		nodeCapabilities.remove(addr);
82 		addrLagCheckUse.remove(addr);
83 	}
84 
85 	bool shouldContinueUsingNode(string addr, ulong outBy) {
86 		bool ret = outBy < 10000;
87 		addrLagCheckUse[addr] = ret;
88 		return ret;
89 	}
90 
91 	void receivedNodeCapabilities(string addr, string[] caps) {
92 		nodeCapabilities[addr] = cast(shared)caps;
93 	}
94 
95 
96 	bool canCreateRemotely(T : Actor)() {
97 		import std.algorithm : canFind;
98 		string[] required = capabilitiesRequired!T;
99 
100 		foreach(addr, caps; nodeCapabilities) {
101 
102 			bool good = true;
103 			foreach(cap; required) {
104 				if (!canFind(cast()caps, cap)) {
105 					good = false;
106 					break;
107 				}
108 			}
109 			if (good) {
110 				return true;
111 			}
112 		}
113 
114 		return false;
115 	}
116 
117 	bool preferablyCreateRemotely(T : Actor)() {
118 		// ugh our load balencing?
119 		return true;
120 	}
121 
122 	string preferableRemoteNodeCreation(string identifier) {
123 		import std.algorithm : canFind;
124 		string[] required = capabilitiesRequired(identifier);
125 		string[] addrs;
126 
127 		foreach(addr, caps; nodeCapabilities) {
128 			bool good = true;
129 			foreach(cap; required) {
130 				if (!canFind(cast(string[])caps, cap)) {
131 					good = false;
132 					break;
133 				}
134 			}
135 			if (good) {
136 				addrs ~= addr;
137 			}
138 		}
139 
140 		if (addrs is null)
141 			return null;
142 
143 		// ugh load balancing?
144 
145 		// preferable vs not preferable?
146 		foreach(addr; addrs) {
147 			if ((cast()addrLagCheckUse).get(addr, true)) {
148 				return addr;
149 			}
150 		}
151 
152 		return null;
153 	}
154 
155 
156 	string localActorCreate(string type) {
157 		import std.conv : to;
158 		if (type !in localClasses)
159 			localClasses[type] = [];
160 
161 		string id = type ~ to!string(utc0Time) ~ to!string(localClasses[type].length);
162 		localClasses[type] ~= id;
163 		return id;
164 	}
165 
166 	void localActorDies(string type, string identifier) {
167 		import std.algorithm : remove;
168 		remove((cast()localClasses[type]), identifier);
169 	}
170 
171 
172 	/**
173 	 * Received a request to create a class instance.
174 	 * 
175 	 * Returns:
176 	 * 		Class instance identifier
177 	 */
178 	string receivedCreateClass(string addr, string uid, string identifier, string parent) {
179 		import std.algorithm : canFind;
180 		string instance = null;
181 
182 		// 	is parent not null?
183 		shared(Actor) parentRef;
184 		if (parent != "") {
185 			bool localInstance = false;
186 			string type;
187 			foreach(type2, instances; localClasses) {
188 				if (canFind(cast()instances, parent)) {
189 					localInstance = true;
190 					type = type2;
191 					break;
192 				}
193 			}
194 			//		is the parent a local class?
195 			if (localInstance) {
196 				parentRef = cast(shared)getInstance(parent).referenceOfActor;
197 				// 		else
198 			} else {
199 				//  		create a new one via actorOf (in some form)
200 				parentRef = cast(shared)new ActorRef!Actor(parent, addr);
201 			}
202 		}
203 
204 		// can we create the actor on this system?
205 		if (canLocalCreate(identifier)) {
206 			// 		ask the registration system for actors to create it
207 			instance = createLocalActor(identifier).identifier;
208 
209 			// else
210 		} else {
211 			//      return null
212 		}
213 
214 		return instance;
215 	}
216 
217 	void receivedEndClassCreate(string uid, string addr, string identifier, string parent) {
218 		remoteClasses[uid] = cast(shared)RemoteClassIdentifier(addr, identifier, parent);
219 	}
220 
221 	bool receivedDeleteClass(string addr, string identifier) {
222 		getInstance(identifier).die();
223 		return true;
224 	}
225 	
226 	void receivedBlockingMethodCall(string uid, string addr, string identifier, string method, ubyte[] data) {
227 		callMethodOnActor(identifier, method, data, addr);
228 	}
229 
230 	ubyte[] receivedNonBlockingMethodCall(string uid, string addr, string identifier, string method, ubyte[] data){
231 		return callMethodOnActor(identifier, method, data, addr);
232 	}
233 	
234 	void receivedClassReturn(string uid, ubyte[] data) {
235 		retData[uid] = cast(shared)data;
236 	}
237 
238 	void receivedClassErrored(string identifier, string identifier2, string message) {
239 		getInstance(identifier).onChildError(identifier2 != "" ? getInstance(identifier2) : null, message);
240 	}
241 
242 	void killConnection(string addr) {
243 		if (addr in remoteConnections)
244 			if ((cast()remoteConnections[addr]).running)
245 				remoteConnections[addr].send(DCA.GoDie);
246 	}
247 
248 	@property string[] connections() {
249 		return remoteConnections.keys;
250 	}
251 
252 	/**
253 	 * Blocking request to remote server to create a class
254 	 * 
255 	 * Returns:
256 	 * 		Class instance identifier
257 	 * 		Or null upon the connection ending.
258 	 */
259 	string createClass(string addr, string identifier, string parent) {
260 		import std.conv : to;
261 
262 		if (addr in remoteConnections) {
263 			if ((cast()remoteConnections[addr]).running) {
264 				string uid = identifier ~ parent ~ to!string(utc0Time());
265 
266 				remoteConnections[addr].send(DCA.CreateClass, uid, identifier, parent);
267 
268 				while(uid !in remoteClasses && (cast()remoteConnections[addr]).running)
269 				{sleep(25.msecs());}
270 
271 				if (uid in remoteClasses) {
272 					remoteClassInstances[addr][identifier] ~= remoteClasses[uid].identifier;
273 					return remoteClasses[uid].identifier;
274 				}
275 			}
276 		}
277 		return null;
278 	}
279 
280 	void killClass(string addr, string type, string identifier) {
281 		import std.algorithm : filter;
282 		string[] newIds;
283 		foreach(v; remoteClassInstances[addr][type]) {
284 			if (v != identifier)
285 				newIds ~= v;
286 		}
287 
288 		remoteClassInstances[addr][type] = cast(shared)newIds;
289 		remoteConnections[addr].send(DCA.DeleteClass, identifier);
290 	}
291 
292 	void actorError(string addr, string identifier, string identifier2, string message) {
293 		remoteConnections[addr].send(DCA.ClassError, identifier, identifier2, message);
294 	}
295 
296 	void callClassNonBlocking(string addr, string identifier, string methodName, ubyte[] data) {
297 		import std.conv : to;
298 		
299 		string uid = identifier ~ methodName ~ to!string(utc0Time());
300 
301 		remoteConnections[addr].send(DCA.ClassCall, uid, identifier, methodName, cast(shared)data, false);
302 	}
303 	
304 	ubyte[] callClassBlocking(string addr, string identifier, string methodName, ubyte[] data){
305 		import std.conv : to;
306 		
307 		string uid = identifier ~ methodName ~ to!string(utc0Time());
308 
309 		remoteConnections[addr].send(DCA.ClassCall, uid, identifier, methodName, cast(shared)data, true);
310 		
311 		while(uid !in retData && (cast()remoteConnections[addr]).running)
312 				{sleep(25.msecs());}
313 				
314 		ubyte[] ret = cast(ubyte[])retData[uid];
315 		
316 		retData.remove(uid);
317 			
318 		return ret;
319 	}
320 
321 	void shutdownAllConnections() {
322 		import dakka.base.remotes.server_handler : shutdownListeners;
323 		import dakka.base.remotes.client_handler : stopAllConnections;
324 		shutdownListeners();
325 		stopAllConnections();
326 	}
327 }
328 
329 struct RemoteClassIdentifier {
330 	string addr;
331 	string identifier;
332 
333 	string parent;
334 }
335 
336 /*
337  * Init connections stuff
338  */
339 
340 struct DakkaRemoteServer {
341 	string hostname;
342 	string[] ips;
343 	ushort port;
344 }
345 
346 struct DakkaServerSettings {
347 	ushort port;
348 
349 	ulong lagMaxTime;
350 }
351 
352 void clientConnect(DakkaRemoteServer[] servers...) {
353 	import dakka.base.remotes.client_handler;
354 	foreach(server; servers) {
355 		handleClientMessageServer(server);
356 	}
357 }
358 
359 void serverStart(DakkaServerSettings[] servers...) {
360 	import dakka.base.remotes.server_handler;
361 
362 	foreach(server; servers) {
363 		handleServerMessageServer(server);
364 	}
365 }
366 
367 /*
368  * Other
369  */
370 
371 struct DakkaActorRefWrapper {
372 	string identifier;
373 	string addr;
374 }
375 
376 T grabActorFromData(T)(Decerealizer d, string addr=null) {
377 	DakkaActorRefWrapper wrapper = d.value!DakkaActorRefWrapper;
378 
379 	if (wrapper.addr is null) {
380 		if (hasInstance(wrapper.identifier)) {
381 			return new ActorRef!T(cast(T)getInstance(wrapper.identifier), true);
382 		} else {
383 			return new ActorRef!T(wrapper.identifier, addr);
384 		}
385 	} else {
386 		return new ActorRef!T(wrapper.identifier, wrapper.addr);
387 	}
388 }