1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2014 Richard Andrew Cattermole
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 module dakka.base.remotes.client_handler;
25 import dakka.base.remotes.defs;
26 import dakka.base.remotes.messages;
27 import vibe.d;
28 import std..string : join;
29 
30 private __gshared {
31 	bool stopConnections;
32 }
33 
34 void handleClientMessageServer(DakkaRemoteServer settings) {
35 	stopConnections = false;
36 
37 	runTask({
38 		foreach(ip; settings.ips) {
39 			auto conn = connectTCP(ip, settings.port);
40 			string addr = conn.remoteAddress.toString();
41 			logInfo("Connecting to server %s", addr);
42 
43 			auto director = getDirector();
44 			listenForCommunications(conn, director);
45 
46 			initMessage(conn);
47 			capabilitiesMessage(conn);
48 
49 			ubyte stage;
50 			size_t iteration;
51 
52 			logInfo("Main loop for server %s", addr);
53 			while(conn.connected && !stopConnections) {
54 				if (iteration % 100 == 5 && stage > 0)
55 					sendSyncMessage(conn);
56 
57 				// first lets setup up our loop iteration
58 				DakkaMessage sending;
59 				DakkaMessage received;
60 				received.receive(conn);
61 
62 				if (received.stage == 0) {
63 					if (stage > 0) {
64 						// its an error to go here from stage 1/2/3
65 						conn.close();
66 						break;
67 					}
68 
69 					if (received.substage == 2) {
70 						// hey its the server's capabilities!
71 						// which means we have finished stage 0
72 						stage = 1;
73 						sendSyncMessage(conn);
74 
75 						director.receivedNodeCapabilities(addr, received.stage0_capabilities);
76 						logInfo("Server %s has capabilities %s", addr, received.stage0_capabilities.join(", "));
77 					}
78 				} else if (received.stage == 1) {
79 					if (stage == 0) {
80 						// its an error to go here from stage 0 to here
81 						conn.close();
82 						break;
83 					}
84 
85 					if (received.substage == 1) {
86 						stage = 2;
87 						askForActors(conn);
88 
89 						logInfo("Server says %s and %s", received.stage1_server_sync.isOverloaded ? "I'm overloaded" : "not overloaded", received.stage1_server_sync.willUse ? "will be used" : "won't be used");
90 					}
91 				} else if (received.stage == 2) {
92 					if (received.substage == 0) {
93 						replyForActors(conn);
94 						logInfo("Server %s asked for our actors", addr);
95 					} else if (received.substage == 1) {
96 						// should we do something here?
97 						foreach(actor; received.stage2_actors) {
98 							askForActorInfo(conn, actor);
99 						}
100 
101 						logInfo("Server %s told us their actors %s", addr, received.stage2_actors.join(", "));
102 					} else if (received.substage == 2) {
103 						replyForActor(conn, received.stage2_actor_request);
104 
105 						logInfo("Server %s asked for our actors %s information", addr, received.stage2_actor_request);
106 					} else if (received.substage == 3) {
107 						logActorsInfo(received.stage2_actor, addr);
108 
109 						logInfo("Server %s has told us their actors %s information", addr, received.stage2_actor.name);
110 					}
111 				} else if (received.stage == 3) {
112 					if (received.substage == 0) {
113 						handleRequestOfClassCreation(conn, director, received.stage3_actor_create);
114 						logInfo("Server %s has asked us to create a %s with parent %s", addr, received.stage3_actor_create.classIdentifier, received.stage3_actor_create.parentInstanceIdentifier);
115 					} else if (received.substage == 1) {
116 						// TODO: how do we get the parent identifier, last arg?
117 						director.receivedEndClassCreate(received.stage3_actor_verify.uid, addr, received.stage3_actor_verify.classInstanceIdentifier, null);
118 						logInfo("Server %s has created an instance of class %s", addr, received.stage3_actor_verify.classInstanceIdentifier);
119 					} else if (received.substage == 2) {
120 						// request to delete an actor instance
121 						bool success = director.receivedDeleteClass(addr, received.stage3_actor_destroy);
122 						askToKill(conn, received.stage3_actor_destroy, success);
123 						logInfo("Server %s has requested us to kill %s and we are %s", addr, received.stage3_actor_destroy, success ? "complying" : "not complying");
124 					} else if (received.substage == 3) {
125 						// requested to delete an actor instance response
126 						logInfo("Server %s has replied that it has %s killed %s", addr, received.stage3_actor_destroy_verify.success ? "been" : "not been", received.stage3_actor_destroy_verify.classInstanceIdentifier);
127 					} else if (received.substage == 4) {
128 						if (received.stage3_method_call.expectsReturnValue)
129 							director.receivedBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data);
130 						else {
131 							ubyte[] ret = director.receivedNonBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data);
132 							classCallMethodReturn(conn, received.stage3_method_call.uid, ret);
133 						}
134 						logInfo("Server %s has asked us to call method %s on %s", addr, received.stage3_method_call.methodName, received.stage3_method_call.classInstanceIdentifier);
135 					} else if (received.substage == 5) {
136 						director.receivedClassReturn(received.stage3_method_return.uid, received.stage3_method_return.data);
137 					} else if (received.substage == 6) {
138 						director.receivedClassErrored(received.stage3_actor_error.classInstanceIdentifier, received.stage3_actor_error.errorClassInstanceIdentifier, received.stage3_actor_error.message);
139 					}
140 				}
141 
142 				iteration++;
143 				sleep(25.msecs);
144 			}
145 
146 			if (stopConnections)
147 				conn.close();
148 
149 			logInfo("Server disconnected %s", addr);
150 		}
151 	});
152 }
153 
154 void stopAllConnections() {
155 	stopConnections = true;
156 }
157 
158 void initMessage(TCPConnection conn) {
159 	import dakka.base.defs : getBuildTitle;
160 	DakkaMessage sending;
161 	sending.stage = 0;
162 	sending.substage = 0;
163 	sending.stage0_init = getBuildTitle();
164 	sending.send(conn);
165 }
166 
167 void capabilitiesMessage(TCPConnection conn) {
168 	import dakka.base.registration.capabilities : getCapabilities;
169 	DakkaMessage sending;
170 
171 	sending.stage = 0;
172 	sending.substage = 1;
173 	sending.stage0_capabilities = getCapabilities();
174 
175 	sending.send(conn);
176 }
177 
178 void sendSyncMessage(TCPConnection conn) {
179 	// checks to make sure lag isn't too bad
180 	DakkaMessage sending;
181 	
182 	sending.stage = 1;
183 	sending.substage = 0;
184 	sending.stage1_client_sync = utc0Time();
185 	
186 	sending.send(conn);
187 }