1 /* 2 * The MIT License (MIT) 3 * 4 * Copyright (c) 2014 Richard Andrew Cattermole 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to deal 8 * in the Software without restriction, including without limitation the rights 9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 * copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in all 14 * copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 * SOFTWARE. 23 */ 24 module dakka.base.remotes.messages; 25 import dakka.base.remotes.defs; 26 import vibe.d; 27 28 struct DakkaMessage { 29 ubyte stage; 30 ubyte substage; 31 32 union { 33 string stage0_init; 34 string[] stage0_capabilities; 35 36 37 ulong stage1_client_sync; 38 ServerLagReply stage1_server_sync; 39 40 41 string[] stage2_actors; 42 string stage2_actor_request; 43 ActorInformation stage2_actor; 44 45 46 ActorClassCreation stage3_actor_create; 47 ActorClassCreationVerify stage3_actor_verify; 48 49 string stage3_actor_destroy; 50 ActorClassDestroyVerify stage3_actor_destroy_verify; 51 52 ActorMethodCall stage3_method_call; 53 ActorMethodReturn stage3_method_return; 54 55 ActorError stage3_actor_error; 56 } 57 58 void receive(TCPConnection stream) { 59 ubyte[2] stages; 60 stream.read(stages); 61 stage = stages[0]; 62 substage = stages[1]; 63 64 RawConvTypes!ulong size; 65 stream.read(size.bytes); 66 67 import std.conv : to; 68 logInfo("size %s", to!string(size.value)); 69 70 if (stage == 0) { 71 if (substage == 0) { 72 stage0_init = cast(string)stream.readSome(cast(size_t)size.value); 73 } else if (substage == 1 || substage == 2) { 74 for(size_t count; count < size.value; count++) { 75 RawConvTypes!ulong size2; 76 stream.read(size2.bytes); 77 stage0_capabilities ~= cast(string)stream.readSome(cast(size_t)size2.value); 78 } 79 } 80 } else if (stage == 1) { 81 if (substage == 0) { 82 RawConvTypes!ulong v; 83 stream.read(v.bytes); 84 stage1_client_sync = v.value; 85 } else if (substage == 1) { 86 ubyte[2] v; 87 stream.read(v); 88 stage1_server_sync.isOverloaded = cast(bool)v[0]; 89 stage1_server_sync.willUse = cast(bool)v[1]; 90 } 91 } else if (stage == 2) { 92 if (substage == 0) { 93 // do nothing. already complete 94 } else if (substage == 1) { 95 RawConvTypes!ulong numberOfClasses; // not needed. But lets stay standard compliant 96 stream.read(numberOfClasses.bytes); 97 98 ulong got; 99 while(got < size.value) { 100 RawConvTypes!ulong classLength; 101 stream.read(classLength.bytes); 102 stage2_actors ~= cast(string)stream.readSome(cast(size_t)classLength.value); 103 got += ulong.sizeof + classLength.value; 104 } 105 } else if (substage == 2) { 106 stage2_actor_request = cast(string)stream.readSome(cast(size_t)size.value); 107 } else if (substage == 3) { 108 RawConvTypes!ulong classNameLength; 109 stream.read(classNameLength.bytes); 110 stage2_actor.name = cast(string)stream.readSome(cast(size_t)classNameLength.value); 111 112 RawConvTypes!ulong methodsCount; 113 stream.read(methodsCount.bytes); 114 115 for (size_t i; i < cast(size_t)methodsCount.value; i++) { 116 ActorMethod method; 117 118 RawConvTypes!ulong methodNameLength; 119 stream.read(methodNameLength.bytes); 120 method.name = cast(string)stream.readSome(cast(size_t)methodNameLength.value); 121 122 RawConvTypes!ulong methodReturnTypeLength; 123 stream.read(methodReturnTypeLength.bytes); 124 method.return_type = cast(string)stream.readSome(cast(size_t)methodReturnTypeLength.value); 125 126 RawConvTypes!ulong argsCount; 127 stream.read(argsCount.bytes); 128 129 for (size_t j; j < cast(size_t)argsCount.value; j++) { 130 ActorMethodArgument arg; 131 132 RawConvTypes!ulong argLength; 133 stream.read(argLength.bytes); 134 arg.type = cast(string)stream.readSome(cast(size_t)argLength.value); 135 136 ubyte[1] usage; 137 stream.read(usage); 138 arg.usage = cast(ActorMethodArgumentUsage)usage[0]; 139 140 method.arguments ~= arg; 141 } 142 143 stage2_actor.methods ~= method; 144 } 145 } 146 } else if (stage == 3) { 147 if (substage == 0) { 148 RawConvTypes!ulong uidLength; 149 stream.read(uidLength.bytes); 150 stage3_actor_create.uid = cast(string)stream.readSome(cast(size_t)uidLength.value); 151 152 RawConvTypes!ulong classIdentifierLength; 153 stream.read(classIdentifierLength.bytes); 154 stage3_actor_create.classIdentifier = cast(string)stream.readSome(cast(size_t)classIdentifierLength.value); 155 156 RawConvTypes!ulong parentInstanceIdentifierLength; 157 stream.read(parentInstanceIdentifierLength.bytes); 158 stage3_actor_create.parentInstanceIdentifier = cast(string)stream.readSome(cast(size_t)parentInstanceIdentifierLength.value); 159 } else if (substage == 1) { 160 RawConvTypes!ulong uidLength; 161 stream.read(uidLength.bytes); 162 stage3_actor_verify.uid = cast(string)stream.readSome(cast(size_t)uidLength.value); 163 164 RawConvTypes!ulong classInstanceIdentifierLength; 165 stream.read(classInstanceIdentifierLength.bytes); 166 stage3_actor_verify.classInstanceIdentifier = cast(string)stream.readSome(cast(size_t)classInstanceIdentifierLength.value); 167 168 ubyte[1] success; 169 stream.read(success); 170 stage3_actor_verify.success = cast(bool)success[0]; 171 } else if (substage == 2) { 172 RawConvTypes!ulong classInstanceIdentifierLength; 173 stream.read(classInstanceIdentifierLength.bytes); 174 stage3_actor_destroy = cast(string)stream.readSome(cast(size_t)classInstanceIdentifierLength.value); 175 176 } else if (substage == 3) { 177 RawConvTypes!ulong classInstanceIdentifierLength; 178 stream.read(classInstanceIdentifierLength.bytes); 179 stage3_actor_destroy_verify.classInstanceIdentifier = cast(string)stream.readSome(cast(size_t)classInstanceIdentifierLength.value); 180 181 ubyte[1] success; 182 stream.read(success); 183 stage3_actor_destroy_verify.success = cast(bool)success[0]; 184 } else if (substage == 4) { 185 RawConvTypes!ulong uidLength; 186 stream.read(uidLength.bytes); 187 stage3_method_call.uid = cast(string)stream.readSome(cast(size_t)uidLength.value); 188 189 RawConvTypes!ulong classInstanceIdentifierLength; 190 stream.read(classInstanceIdentifierLength.bytes); 191 stage3_method_call.classInstanceIdentifier = cast(string)stream.readSome(cast(size_t)classInstanceIdentifierLength.value); 192 193 RawConvTypes!ulong methodNameLength; 194 stream.read(methodNameLength.bytes); 195 stage3_method_call.methodName = cast(string)stream.readSome(cast(size_t)methodNameLength.value); 196 197 RawConvTypes!ulong dataLength; 198 stream.read(dataLength.bytes); 199 stage3_method_call.data = stream.readSome(cast(size_t)dataLength.value); 200 201 ubyte[1] expectsReturnValue; 202 stream.read(expectsReturnValue); 203 stage3_method_call.expectsReturnValue = cast(bool)expectsReturnValue[0]; 204 } else if (substage == 5) { 205 RawConvTypes!ulong uidLength; 206 stream.read(uidLength.bytes); 207 stage3_method_return.uid = cast(string)stream.readSome(cast(size_t)uidLength.value); 208 209 RawConvTypes!ulong dataLength; 210 stream.read(dataLength.bytes); 211 stage3_method_return.data = stream.readSome(cast(size_t)dataLength.value); 212 } else if (substage == 6) { 213 RawConvTypes!ulong classInstanceIdentifierLength; 214 stream.read(classInstanceIdentifierLength.bytes); 215 stage3_actor_error.classInstanceIdentifier = cast(string)stream.readSome(cast(size_t)classInstanceIdentifierLength.value); 216 217 RawConvTypes!ulong messageLength; 218 stream.read(messageLength.bytes); 219 stage3_actor_error.message = cast(string)stream.readSome(cast(size_t)messageLength.value); 220 } 221 } 222 } 223 224 void send(TCPConnection stream) { 225 ubyte[] v; 226 227 v = [stage, substage]; 228 stream.write(v); 229 230 if (stage == 0) { 231 if (substage == 0) { 232 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage0_init.length)); 233 stream.write(stage0_init); 234 } else if (substage == 1 || substage == 2) { 235 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage0_capabilities.length)); 236 foreach(str; stage0_capabilities) { 237 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(str.length)); 238 stream.write(str); 239 } 240 } 241 } else if (stage == 1) { 242 if (substage == 0) { 243 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(ulong.sizeof)); 244 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage1_client_sync)); 245 } else if (substage == 1) { 246 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(2)); 247 stream.write(cast(ubyte[])[stage1_server_sync.isOverloaded, stage1_server_sync.willUse]); 248 } 249 } else if (stage == 2) { 250 if (substage == 0) { 251 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(0)); 252 } else if (substage == 1) { 253 ubyte[] dataOut; 254 foreach(actor; stage2_actors) { 255 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(actor.length) ~ cast(ubyte[])actor; 256 } 257 258 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 259 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage2_actors.length)); 260 stream.write(dataOut); 261 } else if (substage == 2) { 262 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage2_actor_request.length)); 263 stream.write(stage2_actor_request); 264 } else if (substage == 3) { 265 ubyte[] dataOut; 266 267 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage2_actor.name.length); 268 dataOut ~= cast(ubyte[])stage2_actor.name; 269 270 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage2_actor.methods.length); 271 272 foreach(method; stage2_actor.methods) { 273 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(method.name.length); 274 dataOut ~= cast(ubyte[])method.name; 275 276 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(method.return_type.length); 277 dataOut ~= cast(ubyte[])method.return_type; 278 279 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(method.arguments.length); 280 281 foreach(arg; method.arguments) { 282 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(arg.type.length); 283 dataOut ~= cast(ubyte[])arg.type; 284 dataOut ~= cast(ubyte)arg.usage; 285 } 286 } 287 288 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 289 stream.write(dataOut); 290 } 291 } else if (stage == 3) { 292 if (substage == 0) { 293 ubyte[] dataOut; 294 295 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_create.uid.length); 296 dataOut ~= cast(ubyte[])stage3_actor_create.uid; 297 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_create.classIdentifier.length); 298 dataOut ~= cast(ubyte[])stage3_actor_create.classIdentifier; 299 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_create.parentInstanceIdentifier.length); 300 dataOut ~= cast(ubyte[])stage3_actor_create.parentInstanceIdentifier; 301 302 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 303 stream.write(dataOut); 304 } else if (substage == 1) { 305 ubyte[] dataOut; 306 307 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_verify.uid.length); 308 dataOut ~= cast(ubyte[])stage3_actor_verify.uid; 309 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_verify.classInstanceIdentifier.length); 310 dataOut ~= cast(ubyte[])stage3_actor_verify.classInstanceIdentifier; 311 dataOut ~= cast(ubyte)stage3_actor_verify.success; 312 313 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 314 stream.write(dataOut); 315 } else if (substage == 2) { 316 ubyte[] dataOut; 317 318 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_destroy.length); 319 dataOut ~= cast(ubyte[])stage3_actor_destroy; 320 321 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 322 stream.write(dataOut); 323 } else if (substage == 3) { 324 ubyte[] dataOut; 325 326 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_destroy_verify.classInstanceIdentifier.length); 327 dataOut ~= cast(ubyte[])stage3_actor_destroy_verify.classInstanceIdentifier; 328 dataOut ~= cast(ubyte)stage3_actor_destroy_verify.success; 329 330 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 331 stream.write(dataOut); 332 } else if (substage == 4) { 333 ubyte[] dataOut; 334 335 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_method_call.uid.length); 336 dataOut ~= cast(ubyte[])stage3_method_call.uid; 337 338 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_method_call.classInstanceIdentifier.length); 339 dataOut ~= cast(ubyte[])stage3_method_call.classInstanceIdentifier; 340 341 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_method_call.methodName.length); 342 dataOut ~= cast(ubyte[])stage3_method_call.methodName; 343 344 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_method_call.data.length); 345 dataOut ~= stage3_method_call.data; 346 dataOut ~= cast(ubyte)stage3_method_call.expectsReturnValue; 347 348 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 349 stream.write(dataOut); 350 } else if (substage == 5) { 351 ubyte[] dataOut; 352 353 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_method_return.uid.length); 354 dataOut ~= cast(ubyte[])stage3_method_return.uid; 355 356 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_method_return.data.length); 357 dataOut ~= stage3_method_return.data; 358 359 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 360 stream.write(dataOut); 361 } else if (substage == 6) { 362 ubyte[] dataOut; 363 364 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_error.classInstanceIdentifier.length); 365 dataOut ~= cast(ubyte[])stage3_actor_error.classInstanceIdentifier; 366 dataOut ~= cast(ubyte[ulong.sizeof])RawConvTypes!ulong(stage3_actor_error.message.length); 367 dataOut ~= cast(ubyte[])stage3_actor_error.message; 368 369 stream.write(cast(ubyte[ulong.sizeof])RawConvTypes!ulong(dataOut.length)); 370 stream.write(dataOut); 371 } 372 } 373 374 stream.flush(); 375 logInfo("Sent something to %s", stream.remoteAddress.toString()); 376 } 377 } 378 379 ubyte[] readSome(TCPConnection stream, size_t amount) { 380 ubyte[] ret; 381 382 for(size_t i; i < amount; i++) { 383 ubyte[1] v; 384 stream.read(v); 385 ret ~= v[0]; 386 } 387 388 return ret; 389 } 390 391 union RawConvTypes(T) { 392 T value; 393 ubyte[T.sizeof] bytes; 394 395 ubyte[T.sizeof] opCast() { 396 return bytes; 397 } 398 } 399 400 unittest { 401 auto r = RawConvTypes!ulong(4); 402 ubyte[] data = r.bytes; 403 404 assert(data.length == 8); 405 assert(data[0] == 4); 406 for(size_t i=1; i < 8; i++) { 407 assert(data[i] == 0); 408 } 409 } 410 411 unittest { 412 RawConvTypes!ulong data; 413 data.bytes = [4, 0, 0, 0, 0, 0, 0, 0]; 414 assert(data.value == 4); 415 } 416 417 ulong utc0Time() { 418 import std.datetime : SysTime, Clock; 419 420 SysTime curr = Clock.currTime(); 421 curr -= curr.utcOffset; 422 return curr.toUnixTime(); 423 } 424 425 /** 426 * Messages 427 */ 428 429 struct ServerLagReply { 430 bool isOverloaded; 431 bool willUse; 432 } 433 434 struct ActorInformation { 435 string name; 436 ActorMethod[] methods; 437 } 438 439 struct ActorMethod { 440 string name; 441 string return_type; 442 ActorMethodArgument[] arguments; 443 } 444 445 struct ActorMethodArgument { 446 string type; 447 ActorMethodArgumentUsage usage; 448 } 449 450 enum ActorMethodArgumentUsage : ubyte { 451 In, 452 Out, 453 Ref 454 } 455 456 struct ActorClassCreation { 457 string uid; 458 string classIdentifier; 459 string parentInstanceIdentifier; 460 } 461 462 struct ActorClassCreationVerify { 463 string uid; 464 string classInstanceIdentifier; 465 bool success; 466 } 467 468 struct ActorClassDestroyVerify { 469 string classInstanceIdentifier; 470 bool success; 471 } 472 473 struct ActorMethodCall { 474 string uid; 475 string classInstanceIdentifier; 476 string methodName; 477 ubyte[] data; 478 bool expectsReturnValue; 479 } 480 481 struct ActorMethodReturn { 482 string uid; 483 ubyte[] data; 484 } 485 486 struct ActorError { 487 string classInstanceIdentifier; 488 string errorClassInstanceIdentifier; 489 string message; 490 } 491 492 /** 493 * Creation and sending of messages 494 */ 495 496 void askForActors(TCPConnection conn) { 497 DakkaMessage sending; 498 499 sending.stage = 2; 500 sending.substage = 0; 501 502 sending.send(conn); 503 } 504 505 void replyForActors(TCPConnection conn) { 506 import dakka.base.registration.actors : possibleActorNames; 507 DakkaMessage sending; 508 509 sending.stage = 2; 510 sending.substage = 1; 511 sending.stage2_actors = possibleActorNames(); 512 513 sending.send(conn); 514 } 515 516 void askForActorInfo(TCPConnection conn, string name) { 517 DakkaMessage sending; 518 519 sending.stage = 2; 520 sending.substage = 2; 521 sending.stage2_actor_request = name; 522 523 sending.send(conn); 524 } 525 526 void replyForActor(TCPConnection conn, string name) { 527 import dakka.base.registration.actors : getActorInformation; 528 DakkaMessage sending; 529 530 sending.stage = 2; 531 sending.substage = 3; 532 sending.stage2_actor = getActorInformation(name); 533 534 sending.send(conn); 535 } 536 537 void askForClassCreation(TCPConnection conn, string uid, string identifier, string parent) { 538 DakkaMessage sending; 539 540 sending.stage = 3; 541 sending.substage = 0; 542 sending.stage3_actor_create.uid = uid; 543 sending.stage3_actor_create.classIdentifier = identifier; 544 sending.stage3_actor_create.parentInstanceIdentifier = parent; 545 546 sending.send(conn); 547 } 548 549 void handleRequestOfClassCreation(TCPConnection conn, RemoteDirector director, ActorClassCreation data) { 550 string identifier; 551 bool successful; 552 553 // director, hi yeah we need the class created 554 // soo did you create it? 555 // oh you did, did you. So whats its id? 556 // oh you didn't, sorry to hear that. 557 identifier = director.receivedCreateClass(conn.remoteAddress.toString(), data.uid, data.classIdentifier, data.parentInstanceIdentifier); 558 successful = identifier !is null; 559 560 DakkaMessage sending; 561 562 sending.stage = 3; 563 sending.substage = 1; 564 sending.stage3_actor_verify.uid = data.uid; 565 sending.stage3_actor_verify.classInstanceIdentifier = identifier; 566 sending.stage3_actor_verify.success = successful; 567 568 sending.send(conn); 569 } 570 571 void askForClassDeletion(TCPConnection conn, string identifier) { 572 DakkaMessage sending; 573 574 sending.stage = 3; 575 sending.substage = 2; 576 sending.stage3_actor_destroy = identifier; 577 578 sending.send(conn); 579 } 580 581 void askToKill(TCPConnection conn, string identifier, bool success) { 582 DakkaMessage sending; 583 584 sending.stage = 3; 585 sending.substage = 3; 586 sending.stage3_actor_destroy_verify.classInstanceIdentifier = identifier; 587 sending.stage3_actor_destroy_verify.success = success; 588 589 sending.send(conn); 590 } 591 592 void classCallMethod(TCPConnection conn, string uid, string classInstanceIdentifier, string methodName, ubyte[] data, bool expects) { 593 DakkaMessage sending; 594 595 sending.stage = 3; 596 sending.substage = 4; 597 598 sending.stage3_method_call.uid = uid; 599 sending.stage3_method_call.classInstanceIdentifier = classInstanceIdentifier; 600 sending.stage3_method_call.methodName = methodName; 601 sending.stage3_method_call.data = data; 602 sending.stage3_method_call.expectsReturnValue = expects; 603 604 sending.send(conn); 605 } 606 607 void classCallMethodReturn(TCPConnection conn, string uid, ubyte[] data) { 608 DakkaMessage sending; 609 610 sending.stage = 3; 611 sending.substage = 5; 612 613 sending.stage3_method_return.uid = uid; 614 sending.stage3_method_return.data = data; 615 616 sending.send(conn); 617 } 618 619 void classErroredReport(TCPConnection conn, string identifier, string identifier2, string message) { 620 DakkaMessage sending; 621 622 sending.stage = 3; 623 sending.substage = 6; 624 sending.stage3_actor_error.classInstanceIdentifier = identifier; 625 sending.stage3_actor_error.errorClassInstanceIdentifier = identifier2; 626 sending.stage3_actor_error.message = message; 627 628 sending.send(conn); 629 } 630 631 632 /** 633 * Communication to director 634 */ 635 636 enum DirectorCommunicationActions { 637 AreYouStillThere, // not actually used. But its better then it dieing. Should a default instance be sent errornously. 638 GoDie, 639 CreateClass, 640 DeleteClass, 641 ClassCall, 642 ClassError 643 } 644 645 void listenForCommunications(TCPConnection conn, RemoteDirector director) { 646 Task task = runTask({ 647 // director assignment of task should be here. But we can't be sure it will be. 648 649 while(conn.connected) { 650 receive( 651 (DirectorCommunicationActions action) { 652 switch(action) { 653 case DirectorCommunicationActions.GoDie: 654 conn.close(); 655 break; 656 default: 657 break; 658 } 659 }, 660 (DirectorCommunicationActions action, string str1, string str2, string str3) { 661 switch(action) { 662 case DirectorCommunicationActions.CreateClass: 663 askForClassCreation(conn, str1, str2, str3); 664 break; 665 case DirectorCommunicationActions.ClassError: 666 classErroredReport(conn, str1, str2, str3); 667 break; 668 default: 669 break; 670 } 671 }, 672 (DirectorCommunicationActions action, string identifier) { 673 switch(action) { 674 case DirectorCommunicationActions.DeleteClass: 675 askForClassDeletion(conn, identifier); 676 break; 677 default: 678 break; 679 } 680 }, 681 (DirectorCommunicationActions action, string uid, string cid, string mid, shared(ubyte[]) data, bool expects) { 682 switch(action) { 683 case DirectorCommunicationActions.ClassCall: 684 classCallMethod(conn, uid, cid, mid, cast(ubyte[])data, expects); 685 break; 686 default: 687 break; 688 } 689 }); 690 691 sleep(25.msecs); 692 } 693 694 director.unassign(conn.remoteAddress.toString()); 695 }); 696 697 director.assign(conn.remoteAddress.toString(), task); 698 logInfo("Listening for connections from the director started up"); 699 } 700 701 void logActorsInfo(ActorInformation info, string addr) { 702 string desc; 703 desc ~= "class " ~ info.name ~ " {\n"; 704 foreach(method; info.methods) { 705 desc ~= " " ~ method.return_type ~ " " ~ method.name ~ "("; 706 foreach(arg; method.arguments) { 707 if (arg.usage == ActorMethodArgumentUsage.In) 708 desc ~= "in "; 709 else if (arg.usage == ActorMethodArgumentUsage.Out) 710 desc ~= "out "; 711 else if (arg.usage == ActorMethodArgumentUsage.Ref) 712 desc ~= "ref "; 713 desc ~= arg.type ~ ", "; 714 } 715 if (method.arguments.length > 0) 716 desc.length -= 2; 717 desc ~= ");\n"; 718 } 719 desc ~= "}"; 720 721 logInfo("Node %s has told us their actors %s information\n%s", addr, info.name, desc); 722 }