1 /* 2 * The MIT License (MIT) 3 * 4 * Copyright (c) 2014 Richard Andrew Cattermole 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to deal 8 * in the Software without restriction, including without limitation the rights 9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 * copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in all 14 * copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 * SOFTWARE. 23 */ 24 module dakka.base.remotes.defs; 25 import dakka.base.defs; 26 import dakka.base.registration.actors; 27 import dakka.base.remotes.messages : utc0Time; 28 import vibe.d : msecs, Task, send, sleep; 29 import cerealed.decerealizer; 30 31 private shared { 32 RemoteDirector director; 33 34 shared static this() { 35 director = cast(shared)new RemoteDirector(); 36 } 37 } 38 39 void setDirector(T : RemoteDirector)() { 40 directory = cast(shared)new T(); 41 } 42 43 void setDirector(RemoteDirector dir) { 44 director = cast(shared)dir; 45 } 46 47 RemoteDirector getDirector() { 48 synchronized { 49 return cast()director; 50 } 51 } 52 53 class RemoteDirector { 54 import dakka.base.remotes.messages : DirectorCommunicationActions; 55 alias DCA = DirectorCommunicationActions; 56 57 58 private shared { 59 Task[string] remoteConnections; // ThreadID[remoteAddressIdentifier] 60 RemoteClassIdentifier[string] remoteClasses; // ClassIdentifier[uniqueInstanceIdentifier] 61 string[][string][string] remoteClassInstances; // ClassIdentifier[][ClassType][remoteAddressIdentifier] 62 string[][string] nodeCapabilities; //capability[][remoteAddressIdentifier] 63 string[][string] localClasses; // ClassInstanceIdentifier[][ClassIdentifier] 64 bool[string] addrLagCheckUse; // ShouldUse[remoteAddressIdentifier] 65 ubyte[][string] retData; //data[UniqueAccessIdentifier] 66 } 67 68 bool validAddressIdentifier(string adder) {return (adder in remoteConnections) !is null;} 69 bool validateBuildInfo(string) {return true;} 70 bool validateClientCapabilities(string[]) {return true;} 71 72 string[] allRemoteAddresses() { return cast()remoteConnections.keys; } 73 74 void assign(string addr, Task task) { 75 remoteConnections[addr] = cast(shared)task; 76 addrLagCheckUse[addr] = true; 77 } 78 79 void unassign(string addr) { 80 remoteConnections.remove(addr); 81 nodeCapabilities.remove(addr); 82 addrLagCheckUse.remove(addr); 83 } 84 85 bool shouldContinueUsingNode(string addr, ulong outBy) { 86 bool ret = outBy < 10000; 87 addrLagCheckUse[addr] = ret; 88 return ret; 89 } 90 91 void receivedNodeCapabilities(string addr, string[] caps) { 92 nodeCapabilities[addr] = cast(shared)caps; 93 } 94 95 96 bool canCreateRemotely(T : Actor)() { 97 import std.algorithm : canFind; 98 string[] required = capabilitiesRequired!T; 99 100 foreach(addr, caps; nodeCapabilities) { 101 102 bool good = true; 103 foreach(cap; required) { 104 if (!canFind(cast()caps, cap)) { 105 good = false; 106 break; 107 } 108 } 109 if (good) { 110 return true; 111 } 112 } 113 114 return false; 115 } 116 117 bool preferablyCreateRemotely(T : Actor)() { 118 // ugh our load balencing? 119 return true; 120 } 121 122 string preferableRemoteNodeCreation(string identifier) { 123 import std.algorithm : canFind; 124 string[] required = capabilitiesRequired(identifier); 125 string[] addrs; 126 127 foreach(addr, caps; nodeCapabilities) { 128 bool good = true; 129 foreach(cap; required) { 130 if (!canFind(cast(string[])caps, cap)) { 131 good = false; 132 break; 133 } 134 } 135 if (good) { 136 addrs ~= addr; 137 } 138 } 139 140 if (addrs is null) 141 return null; 142 143 // ugh load balancing? 144 145 // preferable vs not preferable? 146 foreach(addr; addrs) { 147 if ((cast()addrLagCheckUse).get(addr, true)) { 148 return addr; 149 } 150 } 151 152 return null; 153 } 154 155 156 string localActorCreate(string type) { 157 import std.conv : to; 158 if (type !in localClasses) 159 localClasses[type] = []; 160 161 string id = type ~ to!string(utc0Time) ~ to!string(localClasses[type].length); 162 localClasses[type] ~= id; 163 return id; 164 } 165 166 void localActorDies(string type, string identifier) { 167 import std.algorithm : remove; 168 remove((cast()localClasses[type]), identifier); 169 } 170 171 172 /** 173 * Received a request to create a class instance. 174 * 175 * Returns: 176 * Class instance identifier 177 */ 178 string receivedCreateClass(string addr, string uid, string identifier, string parent) { 179 import std.algorithm : canFind; 180 string instance = null; 181 182 // is parent not null? 183 shared(Actor) parentRef; 184 if (parent != "") { 185 bool localInstance = false; 186 string type; 187 foreach(type2, instances; localClasses) { 188 if (canFind(cast()instances, parent)) { 189 localInstance = true; 190 type = type2; 191 break; 192 } 193 } 194 // is the parent a local class? 195 if (localInstance) { 196 parentRef = cast(shared)getInstance(parent).referenceOfActor; 197 // else 198 } else { 199 // create a new one via actorOf (in some form) 200 parentRef = cast(shared)new ActorRef!Actor(parent, addr); 201 } 202 } 203 204 // can we create the actor on this system? 205 if (canLocalCreate(identifier)) { 206 // ask the registration system for actors to create it 207 instance = createLocalActor(identifier).identifier; 208 209 // else 210 } else { 211 // return null 212 } 213 214 return instance; 215 } 216 217 void receivedEndClassCreate(string uid, string addr, string identifier, string parent) { 218 remoteClasses[uid] = cast(shared)RemoteClassIdentifier(addr, identifier, parent); 219 } 220 221 bool receivedDeleteClass(string addr, string identifier) { 222 getInstance(identifier).die(); 223 return true; 224 } 225 226 void receivedBlockingMethodCall(string uid, string addr, string identifier, string method, ubyte[] data) { 227 callMethodOnActor(identifier, method, data, addr); 228 } 229 230 ubyte[] receivedNonBlockingMethodCall(string uid, string addr, string identifier, string method, ubyte[] data){ 231 return callMethodOnActor(identifier, method, data, addr); 232 } 233 234 void receivedClassReturn(string uid, ubyte[] data) { 235 retData[uid] = cast(shared)data; 236 } 237 238 void receivedClassErrored(string identifier, string identifier2, string message) { 239 getInstance(identifier).onChildError(identifier2 != "" ? getInstance(identifier2) : null, message); 240 } 241 242 void killConnection(string addr) { 243 if (addr in remoteConnections) 244 if ((cast()remoteConnections[addr]).running) 245 remoteConnections[addr].send(DCA.GoDie); 246 } 247 248 @property string[] connections() { 249 return remoteConnections.keys; 250 } 251 252 /** 253 * Blocking request to remote server to create a class 254 * 255 * Returns: 256 * Class instance identifier 257 * Or null upon the connection ending. 258 */ 259 string createClass(string addr, string identifier, string parent) { 260 import std.conv : to; 261 262 if (addr in remoteConnections) { 263 if ((cast()remoteConnections[addr]).running) { 264 string uid = identifier ~ parent ~ to!string(utc0Time()); 265 266 remoteConnections[addr].send(DCA.CreateClass, uid, identifier, parent); 267 268 while(uid !in remoteClasses && (cast()remoteConnections[addr]).running) 269 {sleep(25.msecs());} 270 271 if (uid in remoteClasses) { 272 remoteClassInstances[addr][identifier] ~= remoteClasses[uid].identifier; 273 return remoteClasses[uid].identifier; 274 } 275 } 276 } 277 return null; 278 } 279 280 void killClass(string addr, string type, string identifier) { 281 import std.algorithm : filter; 282 string[] newIds; 283 foreach(v; remoteClassInstances[addr][type]) { 284 if (v != identifier) 285 newIds ~= v; 286 } 287 288 remoteClassInstances[addr][type] = cast(shared)newIds; 289 remoteConnections[addr].send(DCA.DeleteClass, identifier); 290 } 291 292 void actorError(string addr, string identifier, string identifier2, string message) { 293 remoteConnections[addr].send(DCA.ClassError, identifier, identifier2, message); 294 } 295 296 void callClassNonBlocking(string addr, string identifier, string methodName, ubyte[] data) { 297 import std.conv : to; 298 299 string uid = identifier ~ methodName ~ to!string(utc0Time()); 300 301 remoteConnections[addr].send(DCA.ClassCall, uid, identifier, methodName, cast(shared)data, false); 302 } 303 304 ubyte[] callClassBlocking(string addr, string identifier, string methodName, ubyte[] data){ 305 import std.conv : to; 306 307 string uid = identifier ~ methodName ~ to!string(utc0Time()); 308 309 remoteConnections[addr].send(DCA.ClassCall, uid, identifier, methodName, cast(shared)data, true); 310 311 while(uid !in retData && (cast()remoteConnections[addr]).running) 312 {sleep(25.msecs());} 313 314 ubyte[] ret = cast(ubyte[])retData[uid]; 315 316 retData.remove(uid); 317 318 return ret; 319 } 320 321 void shutdownAllConnections() { 322 import dakka.base.remotes.server_handler : shutdownListeners; 323 import dakka.base.remotes.client_handler : stopAllConnections; 324 shutdownListeners(); 325 stopAllConnections(); 326 } 327 } 328 329 struct RemoteClassIdentifier { 330 string addr; 331 string identifier; 332 333 string parent; 334 } 335 336 /* 337 * Init connections stuff 338 */ 339 340 struct DakkaRemoteServer { 341 string hostname; 342 string[] ips; 343 ushort port; 344 } 345 346 struct DakkaServerSettings { 347 ushort port; 348 349 ulong lagMaxTime; 350 } 351 352 void clientConnect(DakkaRemoteServer[] servers...) { 353 import dakka.base.remotes.client_handler; 354 foreach(server; servers) { 355 handleClientMessageServer(server); 356 } 357 } 358 359 void serverStart(DakkaServerSettings[] servers...) { 360 import dakka.base.remotes.server_handler; 361 362 foreach(server; servers) { 363 handleServerMessageServer(server); 364 } 365 } 366 367 /* 368 * Other 369 */ 370 371 struct DakkaActorRefWrapper { 372 string identifier; 373 string addr; 374 } 375 376 T grabActorFromData(T)(Decerealizer d, string addr=null) { 377 DakkaActorRefWrapper wrapper = d.value!DakkaActorRefWrapper; 378 379 if (wrapper.addr is null) { 380 if (hasInstance(wrapper.identifier)) { 381 return new ActorRef!T(cast(T)getInstance(wrapper.identifier), true); 382 } else { 383 return new ActorRef!T(wrapper.identifier, addr); 384 } 385 } else { 386 return new ActorRef!T(wrapper.identifier, wrapper.addr); 387 } 388 }