1 /** 2 * Contains the RPC client API. 3 * 4 * Note: 5 * This interface is decoupled from the underlying transport stream and 6 * therefore requires manually creating these streams. Higher level interfaces 7 * for common stream implementations are available in vibe.rpcchannel.tcp and 8 * vibe.rpcchannel.noise. 9 * 10 * Synopsis: 11 * -------- 12 * abstract class API 13 * { 14 * void callMethos(); 15 * Event!() onEvent; 16 * } 17 * 18 * auto stream = connectTCP("127.0.0.1", 8080); 19 * auto client = createClientSession!(API, TCPConnection)(stream, stream); 20 * 21 * client.callMethod(); 22 * client.onEvent ~= () {}; 23 * client.disconnect(); 24 * stream.close(); 25 * -------- 26 */ 27 module vibe.rpcchannel.client; 28 29 import std.traits; 30 import std.range : ElementType; 31 import std.exception : enforceEx, collectException; 32 33 import tinyevent; 34 import vibe.core.sync; 35 import vibe.core.stream; 36 import vibe.core.core; 37 38 import vibe.rpcchannel.base; 39 import vibe.rpcchannel.protocol; 40 41 import std.stdio; 42 43 /** 44 * Create a new RPCClient implementing API. 45 * 46 * The info parameter is available as the connectionInfo parameter of 47 * the returned client object. 48 * 49 * Example: 50 * ----- 51 * auto client = createClientSession!(API, string)(stream, "Hello"); 52 * assert(client.connectionInfo == "hello"); 53 * ----- 54 */ 55 RPCClient!(API, ConnectionInfo) createClientSession(API, ConnectionInfo)( 56 Stream stream, ConnectionInfo info) 57 { 58 auto client = new RPCClient!(API, ConnectionInfo)(stream, info); 59 return client; 60 } 61 62 /* 63 * Generate string to mixin to override API methods. Simply 64 * forwards to the callMethod template. 65 */ 66 private string generateMethods(API)() 67 { 68 import std.conv : to; 69 70 string result; 71 72 foreach (member; APIFunctions!API) 73 { 74 foreach (MethodType; APIFunctionOverloads!(API, member)) 75 { 76 alias TParams = Parameters!MethodType; 77 alias TRet = ReturnType!MethodType; 78 enum mangle = typeof(MethodType).mangleof; 79 80 string code = "override " ~ TRet.stringof ~ " " ~ member ~ "("; 81 foreach (i, param; TParams) 82 { 83 if (i != 0) 84 code ~= ", "; 85 code ~= param.stringof ~ " t" ~ to!string(i); 86 } 87 code ~= ")\n{\n"; 88 string call = "callMethod!(" ~ TRet.stringof ~ " function" 89 ~ TParams.stringof ~ ")(\"" ~ member ~ "\", \"" ~ mangle ~ "\""; 90 foreach (i, param; TParams) 91 { 92 call ~= ", t" ~ to!string(i); 93 } 94 call ~= ");"; 95 static if (is(TRet == void)) 96 code ~= " " ~ call; 97 else 98 code ~= " return " ~ call; 99 code ~= "\n}\n\n"; 100 101 result ~= code; 102 } 103 } 104 105 return result; 106 } 107 108 /* 109 * Just make sure all code paths are covered The compiler 110 * does actually a much better job checking the result when 111 * we mixin the code for some real tests in test.d ;-) 112 */ 113 unittest 114 { 115 static abstract class TestAPI 116 { 117 void foo(); 118 int bar(int a, int b); 119 @ignoreRPC int baz(); 120 } 121 122 auto result = generateMethods!TestAPI; 123 } 124 125 /* 126 * Return module where a type is defined. For builting types 127 * return an empty string. 128 */ 129 private template modName(T) 130 { 131 static if (__traits(compiles, moduleName!T)) 132 enum modName = moduleName!T; 133 else 134 enum modName = ""; 135 } 136 137 /* 138 * Check all return and parameter types of the RPC functions in API 139 * and use modName to generate imports for the modules defining these 140 * types. 141 */ 142 private string generateImports(API)() 143 { 144 string result; 145 int[string] modules; 146 147 foreach (MethodType; APIOverloads!API) 148 { 149 alias TParams = Parameters!MethodType; 150 alias TRet = ReturnType!MethodType; 151 modules[modName!TRet] = 1; 152 153 foreach (Param; TParams) 154 modules[modName!Param] = 1; 155 } 156 157 foreach (mod; modules.keys) 158 { 159 if (mod.length) 160 result ~= "import " ~ mod ~ ";\n"; 161 } 162 163 return result; 164 } 165 166 /* 167 * Just make sure all code paths are covered The compiler 168 * does actually a much better job checking the result when 169 * we mixin the code for some real tests in test.d ;-) 170 */ 171 unittest 172 { 173 struct S 174 { 175 } 176 177 static abstract class TestAPI 178 { 179 S foo(); 180 int bar(S a, int b); 181 @ignoreRPC int baz(); 182 } 183 184 auto result = generateImports!TestAPI; 185 } 186 187 /* 188 * Contains information about a pending RPC call. As the call is usually made in 189 * a different Fiber than the Fiber processing responses (readTask) we have 190 * to do some synchronization using _readyEvent. Can't use Mutexes as transferring 191 * lock ownership is not supported. 192 */ 193 private struct RPCRequest 194 { 195 private: 196 // Signaled once a new response is ready 197 ManualEvent _readyEvent; 198 199 public: 200 // If should throw an Exception on resume 201 bool exception = false; 202 // Type of following response (only error/result is allowed) 203 ResponseType type; 204 205 // initialize this RPCRequest 206 void initialize() 207 { 208 _readyEvent = createManualEvent(); 209 } 210 211 // Notify waitReady callers 212 void emitReady() 213 { 214 _readyEvent.emit(); 215 } 216 217 // Wait for emitReady 218 void waitReady() 219 { 220 _readyEvent.wait(); 221 } 222 } 223 224 /** 225 * This implements an RPC client for API. 226 * 227 * The ConnectionInfo type is used to present information about the underlying 228 * connection to the user. To do this, RPCClient exposes the connectionInfo 229 * field of type ConnectionInfo. 230 * 231 * The client overrides all functions and events in API not marked with a 232 * IgnoreUDA attribute. It additionally provides an onDisconnect method which 233 * gets called when the connection gets disconnected locally or by the server. 234 * 235 * The disconnect method can be used to close the connection. The underlying 236 * stream still has to get closed manually. 237 * 238 * Note: Event handlers are called from the Task processing the results of calls. 239 * Because of this, event handlers may not directly call RPC methods. If you 240 * need to call a RPC method from an event handler, spawn a new Task using runTask 241 * first. 242 */ 243 class RPCClient(API, ConnectionInfo) : API 244 { 245 private: 246 // Underlying data connection 247 Stream _stream; 248 // Used to protect concurrent calls to callMethod 249 TaskMutex _writeMutex; 250 // Used to protect concurrent calls to callMethod 251 TaskMutex _requestMutex; 252 253 // Counter of message ids 254 uint _idCounter = 0; 255 256 // The task reading from the stream and dispatching events 257 Task _readTask; 258 // Signalled when a call result or event is done processing 259 ManualEvent _readDone; 260 // Whether we are processing a event or call result. Avoid interleaving 261 bool _processingRead = false; 262 263 // The pending call request 264 RPCRequest _pending; 265 266 // We're processing a event/response, do not interleave by reading other responses 267 void startRead() 268 { 269 _processingRead = true; 270 } 271 272 // We've finished this response. Can now read next response. 273 void finishRead() 274 { 275 _processingRead = false; 276 _readDone.emit(); 277 } 278 279 /* 280 * We were somehow disconnected. Clean up all tasks and notify waiting 281 * calls by throwing Exceptions. 282 */ 283 void shutdown() nothrow 284 { 285 _pending.exception = true; 286 collectException(_pending.emitReady()); 287 288 if (_stream) 289 { 290 _stream = null; 291 collectException(onDisconnect.emit(this)); 292 } 293 294 if (Task.getThis != _readTask) 295 collectException(_readTask.interrupt()); 296 } 297 298 /* 299 * Call remote method identified by name and mangle. 300 * Serialize parameters and results according to MethodType. 301 */ 302 ReturnType!MethodType callMethod(MethodType)(string name, string mangle, 303 Parameters!MethodType args) 304 { 305 assert(Task.getThis != _readTask, "Can not call remote methods from event handler task"); 306 // Make sure there can't be multiple pending calls 307 synchronized (_requestMutex) 308 { 309 scope (exit) 310 finishRead(); 311 enforceEx!DisconnectedException(connected); 312 313 alias TParams = Parameters!MethodType; 314 alias TRet = ReturnType!MethodType; 315 auto msgID = _idCounter++; 316 317 // TODO: Do we have to handle disconnected exceptions? 318 // Send request 319 synchronized (_writeMutex) 320 { 321 _stream.serializeToJsonLine(RequestType.call); 322 auto msg = CallMessage(msgID, name, mangle, TParams.length); 323 _stream.serializeToJsonLine(msg); 324 foreach (arg; args) 325 _stream.serializeToJsonLine(arg); 326 _stream.flush(); 327 } 328 329 // Wait for response 330 _pending.waitReady(); 331 332 // If we were interrupted, disconnected, ... 333 if (_pending.exception) 334 throw new DisconnectedException(); 335 336 switch (_pending.type) 337 { 338 case ResponseType.error: 339 ErrorMessage info; 340 try 341 info = _stream.deserializeJsonLine!ErrorMessage(); 342 catch (Exception e) 343 { 344 shutdown(); 345 throw e; 346 } 347 throw new RPCException( /*info.type, */ info.message, info.file, info.line); 348 case ResponseType.result: 349 ResultMessage info; 350 try 351 info = _stream.deserializeJsonLine!ResultMessage(); 352 catch (Exception e) 353 { 354 shutdown(); 355 throw e; 356 } 357 358 // Exceptions here caused by deserialization failure of the result are recoverable 359 static if (is(TRet == void)) 360 { 361 // Discard result 362 if (info.hasResult) 363 _stream.deserializeJsonLine!void(); 364 break; 365 } 366 else 367 { 368 return _stream.deserializeJsonLine!TRet(); 369 } 370 default: 371 assert(false); 372 } 373 } 374 } 375 376 /* 377 * Received event from server. Call all registered event handlers. 378 */ 379 void emitEvent(string name)(EventMessage event) 380 { 381 alias EventType = ElementType!(typeof(__traits(getMember, typeof(this), name))); 382 alias TRet = ReturnType!EventType; 383 alias TArgs = Parameters!EventType; 384 assert(event.parameters == TArgs.length); 385 TArgs args; 386 387 size_t parametersRead = 0; 388 try 389 { 390 foreach (i, Arg; TArgs) 391 { 392 parametersRead++; 393 args[i] = _stream.deserializeJsonLine!Arg(); 394 } 395 mixin(`this.` ~ name ~ `.emit(args);`); 396 } 397 catch (Exception e) 398 { 399 _stream.skipParameters(event.parameters - parametersRead); 400 } 401 } 402 403 /* 404 * Read the EventMessage from the data stream and dispatch to 405 * emitEvent. 406 */ 407 void handleEventMessage() 408 { 409 scope (exit) 410 finishRead(); 411 412 auto event = _stream.deserializeJsonLine!EventMessage(); 413 414 foreach (member; APIEvents!API) 415 { 416 alias EventType = ElementType!(typeof(__traits(getMember, API, 417 member))); 418 alias TRet = ReturnType!EventType; 419 alias TArgs = Parameters!EventType; 420 421 if (event.target == member && event.parameters == Parameters!(EventType).length) 422 { 423 emitEvent!member(event); 424 return; 425 } 426 } 427 428 // Ignore unhandled events 429 _stream.skipParameters(event.parameters); 430 } 431 432 /* 433 * This task is the only task reading the underlying connection. 434 * Classify incoming messages as event/error/result and dispatch 435 * to handleEventMessage or transfer control to the Task which originally 436 * called the RPC method. 437 */ 438 void readTaskMain() 439 { 440 try 441 { 442 while (true) 443 { 444 // If another task is busy processing a message 445 if (_processingRead) 446 _readDone.wait(); 447 448 // Start processing a message 449 startRead(); 450 451 const type = _stream.deserializeJsonLine!ResponseType(); 452 switch (type) 453 { 454 case ResponseType.disconnect: 455 shutdown(); 456 return; 457 case ResponseType.error: 458 goto case; 459 case ResponseType.result: 460 _pending.type = type; 461 _pending.emitReady(); 462 break; 463 case ResponseType.event: 464 handleEventMessage(); 465 break; 466 default: 467 throw new Exception("Invalid response type"); 468 } 469 } 470 } 471 catch (InterruptException) 472 { 473 // OK, terminate 474 } 475 catch (Exception e) 476 { 477 shutdown(); 478 } 479 } 480 481 /* 482 * Construct a new RPCClient. 483 */ 484 this(Stream stream, ConnectionInfo info) 485 { 486 connectionInfo = info; 487 _stream = stream; 488 _writeMutex = new TaskMutex(); 489 _requestMutex = new TaskMutex(); 490 _readDone = createManualEvent(); 491 492 _pending.initialize(); 493 _readTask = runTask(&readTaskMain); 494 } 495 496 public: 497 mixin(generateImports!API); 498 /// This implements the methods in API 499 mixin(generateMethods!API); 500 501 /** 502 * Connection info passed in to createClientSession. 503 */ 504 ConnectionInfo connectionInfo; 505 506 /** 507 * Whether client session is still connected. 508 * Note: This does not necessarily mean the underlying stream is closed. 509 */ 510 @property bool connected() 511 { 512 return _stream !is null; 513 } 514 515 /** 516 * Called when disconnected. 517 */ 518 Event!(RPCClient!(API, ConnectionInfo)) onDisconnect; 519 520 /** 521 * Sends disconnect signal to remote server and stops internal tasks. 522 * 523 * Note: Do not call any functions on this client instance after disconnecting. 524 * This does not close the underlying stream. Recommended usage pattern: 525 * ----------------------------- 526 * client.disconnect(); 527 * client.connectionInfo.close(); 528 * destroy(client); 529 * ----------------------------- 530 */ 531 void disconnect() 532 { 533 if (!connected) 534 return; 535 536 synchronized (_writeMutex) 537 { 538 _stream.serializeToJsonLine(RequestType.disconnect); 539 _stream.flush(); 540 shutdown(); 541 } 542 } 543 }