1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2014 Richard Andrew Cattermole
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 module dakka.base.remotes.server_handler;
25 import dakka.base.remotes.defs;
26 import dakka.base.remotes.messages;
27 import vibe.d;
28 import std..string : join;
29 
30 private shared {
31 	TCPListener[][] allConnections;
32 }
33 
34 void handleServerMessageServer(DakkaServerSettings settings) {
35 	TCPListener[] listensOn = listenTCP(settings.port, (conn) {
36 		string addr = conn.remoteAddress.toString();
37 		logInfo("Client connected %s", addr);
38 
39 		auto director = getDirector();
40 		listenForCommunications(conn, director);
41 
42 		ubyte stage;
43 
44 		logInfo("Main loop for client %s", addr);
45 		while(conn.connected) {
46 			DakkaMessage sending;
47 			DakkaMessage received;
48 			received.receive(conn);
49 
50 			if (received.stage == 0) {
51 				if (stage > 0) {
52 					// its an error to go here from stage 1/2/3
53 					conn.close();
54 					break;
55 				}
56 
57 				if (received.substage == 0) {
58 					if (!director.validateBuildInfo(received.stage0_init)) {
59 						conn.close();
60 						break;
61 					}
62 					logInfo("Client %s has build identifier of %s", addr, received.stage0_init);
63 				} else if (received.substage == 1) {
64 					// the client capabilities are the last message we should receive in stage 0
65 					stage = 1;
66 
67 					if (!director.validateClientCapabilities(received.stage0_capabilities)) {
68 						// ehh is this client okay to work with us?
69 
70 						conn.close();
71 						break;
72 					}
73 
74 					director.receivedNodeCapabilities(addr, received.stage0_capabilities);
75 					capabilitiesMessage(conn);
76 
77 					logInfo("Client %s has capabilities %s", addr, received.stage0_capabilities.join(", "));
78 				}
79 			} else if (received.stage == 1) {
80 				if (stage == 0) {
81 					// its an error to go here from stage 0 to here
82 					conn.close();
83 					break;
84 				}
85 
86 				if (received.substage == 0) {
87 					ulong now = utc0Time();
88 					stage = 2;
89 					askForActors(conn);
90 					ulong outBy = now - received.stage1_client_sync;
91 
92 					if (outBy > settings.lagMaxTime) {
93 						// lag too high.
94 						// tell them that
95 						bool willUse = director.shouldContinueUsingNode(addr, outBy); // should get this some way though?
96 						lagMessage(conn, true, willUse);
97 
98 						logInfo("Client %s is overloaded", addr);
99 					} else {
100 						lagMessage(conn, false, true);
101 						logInfo("Client %s is not overloaded", addr);
102 					}
103 				}
104 			} else if (received.stage == 2) {
105 				if (received.substage == 0) {
106 					replyForActors(conn);
107 					logInfo("Client %s asked for our actors", addr);
108 				} else if (received.substage == 1) {
109 					// should we do something here?
110 					foreach(actor; received.stage2_actors) {
111 						askForActorInfo(conn, actor);
112 					}
113 					
114 					logInfo("Client %s told us their actors %s", addr, received.stage2_actors.join(", "));
115 				} else if (received.substage == 2) {
116 					replyForActor(conn, received.stage2_actor_request);
117 					
118 					logInfo("Client %s asked for our actors %s information", addr, received.stage2_actor_request);
119 				} else if (received.substage == 3) {
120 					logInfo("Client %s has told us their actors %s information", addr, received.stage2_actor.name);
121 					logActorsInfo(received.stage2_actor, addr);
122 				}
123 			} else if (received.stage == 3) {
124 				if (received.substage == 0) {
125 					handleRequestOfClassCreation(conn, director, received.stage3_actor_create);
126 					logInfo("Client %s has asked us to create a %s with parent %s", addr, received.stage3_actor_create.classIdentifier, received.stage3_actor_create.parentInstanceIdentifier);
127 				} else if (received.substage == 1) {
128 					// TODO: how do we get the parent identifier, last arg?
129 					director.receivedEndClassCreate(received.stage3_actor_verify.uid, addr, received.stage3_actor_verify.classInstanceIdentifier, null);
130 					logInfo("Client %s has created an instance of class %s", addr, received.stage3_actor_verify.classInstanceIdentifier);
131 				} else if (received.substage == 2) {
132 					// request to delete an actor instance
133 					bool success = director.receivedDeleteClass(addr, received.stage3_actor_destroy);
134 					askToKill(conn, received.stage3_actor_destroy, success);
135 					logInfo("Client %s has requested us to kill %s and we are %s", addr, received.stage3_actor_destroy, success ? "complying" : "not complying");
136 				} else if (received.substage == 3) {
137 					// requested to delete an actor instance response
138 					logInfo("Client %s has replied that it has %s killed %s", addr, received.stage3_actor_destroy_verify.success ? "been" : "not been", received.stage3_actor_destroy_verify.classInstanceIdentifier);
139 				} else if (received.substage == 4) {
140 					if (received.stage3_method_call.expectsReturnValue)
141 						director.receivedBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data);
142 					else {
143 						ubyte[] ret = director.receivedNonBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data);
144 						classCallMethodReturn(conn, received.stage3_method_call.uid, ret);
145 					}
146 					logInfo("Client %s has asked us to call method %s on %s", addr, received.stage3_method_call.methodName, received.stage3_method_call.classInstanceIdentifier);
147 				} else if (received.substage == 5) {
148 					director.receivedClassReturn(received.stage3_method_return.uid, received.stage3_method_return.data);
149 				} else if (received.substage == 6) {
150 					director.receivedClassErrored(received.stage3_actor_error.classInstanceIdentifier, received.stage3_actor_error.errorClassInstanceIdentifier, received.stage3_actor_error.message);
151 				}
152 			}
153 
154 			sleep(25.msecs);
155 		}
156 
157 		logInfo("Client disconnected %s", addr);
158 	});
159 
160 	synchronized {
161 		allConnections ~= cast(shared)listensOn;
162 	}
163 	logInfo("Started server listening");
164 }
165 
166 void shutdownListeners() {
167 	synchronized {
168 		foreach(ac; allConnections) {
169 			foreach(c; ac) {
170 				(cast()c).stopListening();
171 			}
172 		}
173 	}
174 }
175 
176 void capabilitiesMessage(TCPConnection conn) {
177 	import dakka.base.registration.capabilities : getCapabilities;
178 	DakkaMessage sending;
179 	
180 	sending.stage = 0;
181 	sending.substage = 2;
182 	sending.stage0_capabilities = getCapabilities();
183 	
184 	sending.send(conn);
185 }
186 
187 void lagMessage(TCPConnection conn, bool tooHigh, bool willUse) {
188 	DakkaMessage sending;
189 
190 	sending.stage = 1;
191 	sending.substage = 1;
192 	sending.stage1_server_sync.isOverloaded = tooHigh;
193 	sending.stage1_server_sync.willUse = willUse;
194 	
195 	sending.send(conn);
196 }