1 /* 2 * The MIT License (MIT) 3 * 4 * Copyright (c) 2014 Richard Andrew Cattermole 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to deal 8 * in the Software without restriction, including without limitation the rights 9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 * copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in all 14 * copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 * SOFTWARE. 23 */ 24 module dakka.base.remotes.server_handler; 25 import dakka.base.remotes.defs; 26 import dakka.base.remotes.messages; 27 import vibe.d; 28 import std..string : join; 29 30 private shared { 31 TCPListener[][] allConnections; 32 } 33 34 void handleServerMessageServer(DakkaServerSettings settings) { 35 TCPListener[] listensOn = listenTCP(settings.port, (conn) { 36 string addr = conn.remoteAddress.toString(); 37 logInfo("Client connected %s", addr); 38 39 auto director = getDirector(); 40 listenForCommunications(conn, director); 41 42 ubyte stage; 43 44 logInfo("Main loop for client %s", addr); 45 while(conn.connected) { 46 DakkaMessage sending; 47 DakkaMessage received; 48 received.receive(conn); 49 50 if (received.stage == 0) { 51 if (stage > 0) { 52 // its an error to go here from stage 1/2/3 53 conn.close(); 54 break; 55 } 56 57 if (received.substage == 0) { 58 if (!director.validateBuildInfo(received.stage0_init)) { 59 conn.close(); 60 break; 61 } 62 logInfo("Client %s has build identifier of %s", addr, received.stage0_init); 63 } else if (received.substage == 1) { 64 // the client capabilities are the last message we should receive in stage 0 65 stage = 1; 66 67 if (!director.validateClientCapabilities(received.stage0_capabilities)) { 68 // ehh is this client okay to work with us? 69 70 conn.close(); 71 break; 72 } 73 74 director.receivedNodeCapabilities(addr, received.stage0_capabilities); 75 capabilitiesMessage(conn); 76 77 logInfo("Client %s has capabilities %s", addr, received.stage0_capabilities.join(", ")); 78 } 79 } else if (received.stage == 1) { 80 if (stage == 0) { 81 // its an error to go here from stage 0 to here 82 conn.close(); 83 break; 84 } 85 86 if (received.substage == 0) { 87 ulong now = utc0Time(); 88 stage = 2; 89 askForActors(conn); 90 ulong outBy = now - received.stage1_client_sync; 91 92 if (outBy > settings.lagMaxTime) { 93 // lag too high. 94 // tell them that 95 bool willUse = director.shouldContinueUsingNode(addr, outBy); // should get this some way though? 96 lagMessage(conn, true, willUse); 97 98 logInfo("Client %s is overloaded", addr); 99 } else { 100 lagMessage(conn, false, true); 101 logInfo("Client %s is not overloaded", addr); 102 } 103 } 104 } else if (received.stage == 2) { 105 if (received.substage == 0) { 106 replyForActors(conn); 107 logInfo("Client %s asked for our actors", addr); 108 } else if (received.substage == 1) { 109 // should we do something here? 110 foreach(actor; received.stage2_actors) { 111 askForActorInfo(conn, actor); 112 } 113 114 logInfo("Client %s told us their actors %s", addr, received.stage2_actors.join(", ")); 115 } else if (received.substage == 2) { 116 replyForActor(conn, received.stage2_actor_request); 117 118 logInfo("Client %s asked for our actors %s information", addr, received.stage2_actor_request); 119 } else if (received.substage == 3) { 120 logInfo("Client %s has told us their actors %s information", addr, received.stage2_actor.name); 121 logActorsInfo(received.stage2_actor, addr); 122 } 123 } else if (received.stage == 3) { 124 if (received.substage == 0) { 125 handleRequestOfClassCreation(conn, director, received.stage3_actor_create); 126 logInfo("Client %s has asked us to create a %s with parent %s", addr, received.stage3_actor_create.classIdentifier, received.stage3_actor_create.parentInstanceIdentifier); 127 } else if (received.substage == 1) { 128 // TODO: how do we get the parent identifier, last arg? 129 director.receivedEndClassCreate(received.stage3_actor_verify.uid, addr, received.stage3_actor_verify.classInstanceIdentifier, null); 130 logInfo("Client %s has created an instance of class %s", addr, received.stage3_actor_verify.classInstanceIdentifier); 131 } else if (received.substage == 2) { 132 // request to delete an actor instance 133 bool success = director.receivedDeleteClass(addr, received.stage3_actor_destroy); 134 askToKill(conn, received.stage3_actor_destroy, success); 135 logInfo("Client %s has requested us to kill %s and we are %s", addr, received.stage3_actor_destroy, success ? "complying" : "not complying"); 136 } else if (received.substage == 3) { 137 // requested to delete an actor instance response 138 logInfo("Client %s has replied that it has %s killed %s", addr, received.stage3_actor_destroy_verify.success ? "been" : "not been", received.stage3_actor_destroy_verify.classInstanceIdentifier); 139 } else if (received.substage == 4) { 140 if (received.stage3_method_call.expectsReturnValue) 141 director.receivedBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data); 142 else { 143 ubyte[] ret = director.receivedNonBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data); 144 classCallMethodReturn(conn, received.stage3_method_call.uid, ret); 145 } 146 logInfo("Client %s has asked us to call method %s on %s", addr, received.stage3_method_call.methodName, received.stage3_method_call.classInstanceIdentifier); 147 } else if (received.substage == 5) { 148 director.receivedClassReturn(received.stage3_method_return.uid, received.stage3_method_return.data); 149 } else if (received.substage == 6) { 150 director.receivedClassErrored(received.stage3_actor_error.classInstanceIdentifier, received.stage3_actor_error.errorClassInstanceIdentifier, received.stage3_actor_error.message); 151 } 152 } 153 154 sleep(25.msecs); 155 } 156 157 logInfo("Client disconnected %s", addr); 158 }); 159 160 synchronized { 161 allConnections ~= cast(shared)listensOn; 162 } 163 logInfo("Started server listening"); 164 } 165 166 void shutdownListeners() { 167 synchronized { 168 foreach(ac; allConnections) { 169 foreach(c; ac) { 170 (cast()c).stopListening(); 171 } 172 } 173 } 174 } 175 176 void capabilitiesMessage(TCPConnection conn) { 177 import dakka.base.registration.capabilities : getCapabilities; 178 DakkaMessage sending; 179 180 sending.stage = 0; 181 sending.substage = 2; 182 sending.stage0_capabilities = getCapabilities(); 183 184 sending.send(conn); 185 } 186 187 void lagMessage(TCPConnection conn, bool tooHigh, bool willUse) { 188 DakkaMessage sending; 189 190 sending.stage = 1; 191 sending.substage = 1; 192 sending.stage1_server_sync.isOverloaded = tooHigh; 193 sending.stage1_server_sync.willUse = willUse; 194 195 sending.send(conn); 196 }