1 /* 2 * The MIT License (MIT) 3 * 4 * Copyright (c) 2014 Richard Andrew Cattermole 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to deal 8 * in the Software without restriction, including without limitation the rights 9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 * copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in all 14 * copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 * SOFTWARE. 23 */ 24 module dakka.base.remotes.client_handler; 25 import dakka.base.remotes.defs; 26 import dakka.base.remotes.messages; 27 import vibe.d; 28 import std..string : join; 29 30 private __gshared { 31 bool stopConnections; 32 } 33 34 void handleClientMessageServer(DakkaRemoteServer settings) { 35 stopConnections = false; 36 37 runTask({ 38 foreach(ip; settings.ips) { 39 auto conn = connectTCP(ip, settings.port); 40 string addr = conn.remoteAddress.toString(); 41 logInfo("Connecting to server %s", addr); 42 43 auto director = getDirector(); 44 listenForCommunications(conn, director); 45 46 initMessage(conn); 47 capabilitiesMessage(conn); 48 49 ubyte stage; 50 size_t iteration; 51 52 logInfo("Main loop for server %s", addr); 53 while(conn.connected && !stopConnections) { 54 if (iteration % 100 == 5 && stage > 0) 55 sendSyncMessage(conn); 56 57 // first lets setup up our loop iteration 58 DakkaMessage sending; 59 DakkaMessage received; 60 received.receive(conn); 61 62 if (received.stage == 0) { 63 if (stage > 0) { 64 // its an error to go here from stage 1/2/3 65 conn.close(); 66 break; 67 } 68 69 if (received.substage == 2) { 70 // hey its the server's capabilities! 71 // which means we have finished stage 0 72 stage = 1; 73 sendSyncMessage(conn); 74 75 director.receivedNodeCapabilities(addr, received.stage0_capabilities); 76 logInfo("Server %s has capabilities %s", addr, received.stage0_capabilities.join(", ")); 77 } 78 } else if (received.stage == 1) { 79 if (stage == 0) { 80 // its an error to go here from stage 0 to here 81 conn.close(); 82 break; 83 } 84 85 if (received.substage == 1) { 86 stage = 2; 87 askForActors(conn); 88 89 logInfo("Server says %s and %s", received.stage1_server_sync.isOverloaded ? "I'm overloaded" : "not overloaded", received.stage1_server_sync.willUse ? "will be used" : "won't be used"); 90 } 91 } else if (received.stage == 2) { 92 if (received.substage == 0) { 93 replyForActors(conn); 94 logInfo("Server %s asked for our actors", addr); 95 } else if (received.substage == 1) { 96 // should we do something here? 97 foreach(actor; received.stage2_actors) { 98 askForActorInfo(conn, actor); 99 } 100 101 logInfo("Server %s told us their actors %s", addr, received.stage2_actors.join(", ")); 102 } else if (received.substage == 2) { 103 replyForActor(conn, received.stage2_actor_request); 104 105 logInfo("Server %s asked for our actors %s information", addr, received.stage2_actor_request); 106 } else if (received.substage == 3) { 107 logActorsInfo(received.stage2_actor, addr); 108 109 logInfo("Server %s has told us their actors %s information", addr, received.stage2_actor.name); 110 } 111 } else if (received.stage == 3) { 112 if (received.substage == 0) { 113 handleRequestOfClassCreation(conn, director, received.stage3_actor_create); 114 logInfo("Server %s has asked us to create a %s with parent %s", addr, received.stage3_actor_create.classIdentifier, received.stage3_actor_create.parentInstanceIdentifier); 115 } else if (received.substage == 1) { 116 // TODO: how do we get the parent identifier, last arg? 117 director.receivedEndClassCreate(received.stage3_actor_verify.uid, addr, received.stage3_actor_verify.classInstanceIdentifier, null); 118 logInfo("Server %s has created an instance of class %s", addr, received.stage3_actor_verify.classInstanceIdentifier); 119 } else if (received.substage == 2) { 120 // request to delete an actor instance 121 bool success = director.receivedDeleteClass(addr, received.stage3_actor_destroy); 122 askToKill(conn, received.stage3_actor_destroy, success); 123 logInfo("Server %s has requested us to kill %s and we are %s", addr, received.stage3_actor_destroy, success ? "complying" : "not complying"); 124 } else if (received.substage == 3) { 125 // requested to delete an actor instance response 126 logInfo("Server %s has replied that it has %s killed %s", addr, received.stage3_actor_destroy_verify.success ? "been" : "not been", received.stage3_actor_destroy_verify.classInstanceIdentifier); 127 } else if (received.substage == 4) { 128 if (received.stage3_method_call.expectsReturnValue) 129 director.receivedBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data); 130 else { 131 ubyte[] ret = director.receivedNonBlockingMethodCall(received.stage3_method_call.uid, addr, received.stage3_method_call.classInstanceIdentifier, received.stage3_method_call.methodName, received.stage3_method_call.data); 132 classCallMethodReturn(conn, received.stage3_method_call.uid, ret); 133 } 134 logInfo("Server %s has asked us to call method %s on %s", addr, received.stage3_method_call.methodName, received.stage3_method_call.classInstanceIdentifier); 135 } else if (received.substage == 5) { 136 director.receivedClassReturn(received.stage3_method_return.uid, received.stage3_method_return.data); 137 } else if (received.substage == 6) { 138 director.receivedClassErrored(received.stage3_actor_error.classInstanceIdentifier, received.stage3_actor_error.errorClassInstanceIdentifier, received.stage3_actor_error.message); 139 } 140 } 141 142 iteration++; 143 sleep(25.msecs); 144 } 145 146 if (stopConnections) 147 conn.close(); 148 149 logInfo("Server disconnected %s", addr); 150 } 151 }); 152 } 153 154 void stopAllConnections() { 155 stopConnections = true; 156 } 157 158 void initMessage(TCPConnection conn) { 159 import dakka.base.defs : getBuildTitle; 160 DakkaMessage sending; 161 sending.stage = 0; 162 sending.substage = 0; 163 sending.stage0_init = getBuildTitle(); 164 sending.send(conn); 165 } 166 167 void capabilitiesMessage(TCPConnection conn) { 168 import dakka.base.registration.capabilities : getCapabilities; 169 DakkaMessage sending; 170 171 sending.stage = 0; 172 sending.substage = 1; 173 sending.stage0_capabilities = getCapabilities(); 174 175 sending.send(conn); 176 } 177 178 void sendSyncMessage(TCPConnection conn) { 179 // checks to make sure lag isn't too bad 180 DakkaMessage sending; 181 182 sending.stage = 1; 183 sending.substage = 0; 184 sending.stage1_client_sync = utc0Time(); 185 186 sending.send(conn); 187 }