1 /**
2  * Contains the RPC client API.
3  *  
4  * Note:
5  * This interface is decoupled from the underlying transport stream and
6  * therefore requires manually creating these streams. Higher level interfaces
7  * for common stream implementations are available in vibe.rpcchannel.tcp and
8  * vibe.rpcchannel.noise.
9  *
10  * Synopsis:
11  * --------
12  * abstract class API
13  * {
14  *     void callMethos();
15  *     Event!() onEvent;
16  * }
17  *
18  * auto stream = connectTCP("127.0.0.1", 8080);
19  * auto client = createClientSession!(API, TCPConnection)(stream, stream);
20  * 
21  * client.callMethod();
22  * client.onEvent ~= () {};
23  * client.disconnect();
24  * stream.close();
25  * --------
26  */
27 module vibe.rpcchannel.client;
28 
29 import std.traits;
30 import std.range : ElementType;
31 import std.exception : enforceEx, collectException;
32 
33 import tinyevent;
34 import vibe.core.sync;
35 import vibe.core.stream;
36 import vibe.core.core;
37 
38 import vibe.rpcchannel.base;
39 import vibe.rpcchannel.protocol;
40 
41 import std.stdio;
42 
43 /**
44  * Create a new RPCClient implementing API.
45  *
46  * The info parameter is available as the connectionInfo parameter of
47  * the returned client object.
48  *
49  * Example:
50  * -----
51  * auto client = createClientSession!(API, string)(stream, "Hello");
52  * assert(client.connectionInfo == "hello");
53  * -----
54  */
55 RPCClient!(API, ConnectionInfo) createClientSession(API, ConnectionInfo)(
56     Stream stream, ConnectionInfo info)
57 {
58     auto client = new RPCClient!(API, ConnectionInfo)(stream, info);
59     return client;
60 }
61 
62 /*
63  * Generate string to mixin to override API methods. Simply
64  * forwards to the callMethod template.
65  */
66 private string generateMethods(API)()
67 {
68     import std.conv : to;
69 
70     string result;
71 
72     foreach (member; APIFunctions!API)
73     {
74         foreach (MethodType; APIFunctionOverloads!(API, member))
75         {
76             alias TParams = Parameters!MethodType;
77             alias TRet = ReturnType!MethodType;
78             enum mangle = typeof(MethodType).mangleof;
79 
80             string code = "override " ~ TRet.stringof ~ " " ~ member ~ "(";
81             foreach (i, param; TParams)
82             {
83                 if (i != 0)
84                     code ~= ", ";
85                 code ~= param.stringof ~ " t" ~ to!string(i);
86             }
87             code ~= ")\n{\n";
88             string call = "callMethod!(" ~ TRet.stringof ~ " function"
89                 ~ TParams.stringof ~ ")(\"" ~ member ~ "\", \"" ~ mangle ~ "\"";
90             foreach (i, param; TParams)
91             {
92                 call ~= ", t" ~ to!string(i);
93             }
94             call ~= ");";
95             static if (is(TRet == void))
96                 code ~= "    " ~ call;
97             else
98                 code ~= "    return " ~ call;
99             code ~= "\n}\n\n";
100 
101             result ~= code;
102         }
103     }
104 
105     return result;
106 }
107 
108 /*
109  * Just make sure all code paths are covered The compiler
110  * does actually a much better job checking the result when
111  * we mixin the code for some real tests in test.d ;-)
112  */
113 unittest
114 {
115     static abstract class TestAPI
116     {
117         void foo();
118         int bar(int a, int b);
119         @ignoreRPC int baz();
120     }
121 
122     auto result = generateMethods!TestAPI;
123 }
124 
125 /*
126  * Return module where a type is defined. For builting types
127  * return an empty string.
128  */
129 private template modName(T)
130 {
131     static if (__traits(compiles, moduleName!T))
132         enum modName = moduleName!T;
133     else
134         enum modName = "";
135 }
136 
137 /*
138  * Check all return and parameter types of the RPC functions in API
139  * and use modName to generate imports for the modules defining these
140  * types.
141  */
142 private string generateImports(API)()
143 {
144     string result;
145     int[string] modules;
146 
147     foreach (MethodType; APIOverloads!API)
148     {
149         alias TParams = Parameters!MethodType;
150         alias TRet = ReturnType!MethodType;
151         modules[modName!TRet] = 1;
152 
153         foreach (Param; TParams)
154             modules[modName!Param] = 1;
155     }
156 
157     foreach (mod; modules.keys)
158     {
159         if (mod.length)
160             result ~= "import " ~ mod ~ ";\n";
161     }
162 
163     return result;
164 }
165 
166 /*
167  * Just make sure all code paths are covered The compiler
168  * does actually a much better job checking the result when
169  * we mixin the code for some real tests in test.d ;-)
170  */
171 unittest
172 {
173     struct S
174     {
175     }
176 
177     static abstract class TestAPI
178     {
179         S foo();
180         int bar(S a, int b);
181         @ignoreRPC int baz();
182     }
183 
184     auto result = generateImports!TestAPI;
185 }
186 
187 /*
188  * Contains information about a pending RPC call. As the call is usually made in
189  * a different Fiber than the Fiber processing responses (readTask) we have
190  * to do some synchronization using _readyEvent. Can't use Mutexes as transferring
191  * lock ownership is not supported.
192  */
193 private struct RPCRequest
194 {
195 private:
196     // Signaled once a new response is ready
197     ManualEvent _readyEvent;
198 
199 public:
200     // If should throw an Exception on resume
201     bool exception = false;
202     // Type of following response (only error/result is allowed)
203     ResponseType type;
204 
205     // initialize this RPCRequest
206     void initialize()
207     {
208         _readyEvent = createManualEvent();
209     }
210 
211     // Notify waitReady callers
212     void emitReady()
213     {
214         _readyEvent.emit();
215     }
216 
217     // Wait for emitReady
218     void waitReady()
219     {
220         _readyEvent.wait();
221     }
222 }
223 
224 /**
225  * This implements an RPC client for API.
226  * 
227  * The ConnectionInfo type is used to present information about the underlying
228  * connection to the user. To do this, RPCClient exposes the connectionInfo
229  * field of type ConnectionInfo.
230  *
231  * The client overrides all functions and events in API not marked with a
232  * IgnoreUDA attribute. It additionally provides an onDisconnect method which
233  * gets called when the connection gets disconnected locally or by the server.
234  *
235  * The disconnect method can be used to close the connection. The underlying
236  * stream still has to get closed manually.
237  *
238  * Note: Event handlers are called from the Task processing the results of calls.
239  * Because of this, event handlers may not directly call RPC methods. If you
240  * need to call a RPC method from an event handler, spawn a new Task using runTask
241  * first.
242  */
243 class RPCClient(API, ConnectionInfo) : API
244 {
245 private:
246     // Underlying data connection
247     Stream _stream;
248     // Used to protect concurrent calls to callMethod
249     TaskMutex _writeMutex;
250     // Used to protect concurrent calls to callMethod
251     TaskMutex _requestMutex;
252 
253     // Counter of message ids
254     uint _idCounter = 0;
255 
256     // The task reading from the stream and dispatching events
257     Task _readTask;
258     // Signalled when a call result or event is done processing
259     ManualEvent _readDone;
260     // Whether we are processing a event or call result. Avoid interleaving
261     bool _processingRead = false;
262 
263     // The pending call request
264     RPCRequest _pending;
265 
266     // We're processing a event/response, do not interleave by reading other responses
267     void startRead()
268     {
269         _processingRead = true;
270     }
271 
272     // We've finished this response. Can now read next response.
273     void finishRead()
274     {
275         _processingRead = false;
276         _readDone.emit();
277     }
278 
279     /*
280      * We were somehow disconnected. Clean up all tasks and notify waiting
281      * calls by throwing Exceptions.
282      */
283     void shutdown() nothrow
284     {
285         _pending.exception = true;
286         collectException(_pending.emitReady());
287 
288         if (_stream)
289         {
290             _stream = null;
291             collectException(onDisconnect.emit(this));
292         }
293 
294         if (Task.getThis != _readTask)
295             collectException(_readTask.interrupt());
296     }
297 
298     /*
299      * Call remote method identified by name and mangle.
300      * Serialize parameters and results according to MethodType.
301      */
302     ReturnType!MethodType callMethod(MethodType)(string name, string mangle,
303         Parameters!MethodType args)
304     {
305         assert(Task.getThis != _readTask, "Can not call remote methods from event handler task");
306         // Make sure there can't be multiple pending calls
307         synchronized (_requestMutex)
308         {
309             scope (exit)
310                 finishRead();
311             enforceEx!DisconnectedException(connected);
312 
313             alias TParams = Parameters!MethodType;
314             alias TRet = ReturnType!MethodType;
315             auto msgID = _idCounter++;
316 
317             // TODO: Do we have to handle disconnected exceptions?
318             // Send request
319             synchronized (_writeMutex)
320             {
321                 _stream.serializeToJsonLine(RequestType.call);
322                 auto msg = CallMessage(msgID, name, mangle, TParams.length);
323                 _stream.serializeToJsonLine(msg);
324                 foreach (arg; args)
325                     _stream.serializeToJsonLine(arg);
326                 _stream.flush();
327             }
328 
329             // Wait for response
330             _pending.waitReady();
331 
332             // If we were interrupted, disconnected, ...
333             if (_pending.exception)
334                 throw new DisconnectedException();
335 
336             switch (_pending.type)
337             {
338             case ResponseType.error:
339                 ErrorMessage info;
340                 try
341                     info = _stream.deserializeJsonLine!ErrorMessage();
342                 catch (Exception e)
343                 {
344                     shutdown();
345                     throw e;
346                 }
347                 throw new RPCException( /*info.type, */ info.message, info.file, info.line);
348             case ResponseType.result:
349                 ResultMessage info;
350                 try
351                     info = _stream.deserializeJsonLine!ResultMessage();
352                 catch (Exception e)
353                 {
354                     shutdown();
355                     throw e;
356                 }
357 
358                 // Exceptions here caused by deserialization failure of the result are recoverable
359                 static if (is(TRet == void))
360                 {
361                     // Discard result
362                     if (info.hasResult)
363                         _stream.deserializeJsonLine!void();
364                     break;
365                 }
366                 else
367                 {
368                     return _stream.deserializeJsonLine!TRet();
369                 }
370             default:
371                 assert(false);
372             }
373         }
374     }
375 
376     /*
377      * Received event from server. Call all registered event handlers.
378      */
379     void emitEvent(string name)(EventMessage event)
380     {
381         alias EventType = ElementType!(typeof(__traits(getMember, typeof(this), name)));
382         alias TRet = ReturnType!EventType;
383         alias TArgs = Parameters!EventType;
384         assert(event.parameters == TArgs.length);
385         TArgs args;
386 
387         size_t parametersRead = 0;
388         try
389         {
390             foreach (i, Arg; TArgs)
391             {
392                 parametersRead++;
393                 args[i] = _stream.deserializeJsonLine!Arg();
394             }
395             mixin(`this.` ~ name ~ `.emit(args);`);
396         }
397         catch (Exception e)
398         {
399             _stream.skipParameters(event.parameters - parametersRead);
400         }
401     }
402 
403     /*
404      * Read the EventMessage from the data stream and dispatch to
405      * emitEvent.
406      */
407     void handleEventMessage()
408     {
409         scope (exit)
410             finishRead();
411 
412         auto event = _stream.deserializeJsonLine!EventMessage();
413 
414         foreach (member; APIEvents!API)
415         {
416             alias EventType = ElementType!(typeof(__traits(getMember, API,
417                 member)));
418             alias TRet = ReturnType!EventType;
419             alias TArgs = Parameters!EventType;
420 
421             if (event.target == member && event.parameters == Parameters!(EventType).length)
422             {
423                 emitEvent!member(event);
424                 return;
425             }
426         }
427 
428         // Ignore unhandled events
429         _stream.skipParameters(event.parameters);
430     }
431 
432     /*
433      * This task is the only task reading the underlying connection.
434      * Classify incoming messages as event/error/result and dispatch
435      * to handleEventMessage or transfer control to the Task which originally
436      * called the RPC method.
437      */
438     void readTaskMain()
439     {
440         try
441         {
442             while (true)
443             {
444                 // If another task is busy processing a message
445                 if (_processingRead)
446                     _readDone.wait();
447 
448                 // Start processing a message
449                 startRead();
450 
451                 const type = _stream.deserializeJsonLine!ResponseType();
452                 switch (type)
453                 {
454                 case ResponseType.disconnect:
455                     shutdown();
456                     return;
457                 case ResponseType.error:
458                     goto case;
459                 case ResponseType.result:
460                     _pending.type = type;
461                     _pending.emitReady();
462                     break;
463                 case ResponseType.event:
464                     handleEventMessage();
465                     break;
466                 default:
467                     throw new Exception("Invalid response type");
468                 }
469             }
470         }
471         catch (InterruptException)
472         {
473             // OK, terminate
474         }
475         catch (Exception e)
476         {
477             shutdown();
478         }
479     }
480 
481     /*
482      * Construct a new RPCClient.
483      */
484     this(Stream stream, ConnectionInfo info)
485     {
486         connectionInfo = info;
487         _stream = stream;
488         _writeMutex = new TaskMutex();
489         _requestMutex = new TaskMutex();
490         _readDone = createManualEvent();
491 
492         _pending.initialize();
493         _readTask = runTask(&readTaskMain);
494     }
495 
496 public:
497     mixin(generateImports!API);
498     /// This implements the methods in API
499     mixin(generateMethods!API);
500 
501     /**
502      * Connection info passed in to createClientSession.
503      */
504     ConnectionInfo connectionInfo;
505 
506     /**
507      * Whether client session is still connected.
508      * Note: This does not necessarily mean the underlying stream is closed.
509      */
510     @property bool connected()
511     {
512         return _stream !is null;
513     }
514 
515     /**
516      * Called when disconnected.
517      */
518     Event!(RPCClient!(API, ConnectionInfo)) onDisconnect;
519 
520     /**
521      * Sends disconnect signal to remote server and stops internal tasks.
522      * 
523      * Note: Do not call any functions on this client instance after disconnecting.
524      * This does not close the underlying stream. Recommended usage pattern:
525      * -----------------------------
526      * client.disconnect();
527      * client.connectionInfo.close();
528      * destroy(client);
529      * -----------------------------
530      */
531     void disconnect()
532     {
533         if (!connected)
534             return;
535 
536         synchronized (_writeMutex)
537         {
538             _stream.serializeToJsonLine(RequestType.disconnect);
539             _stream.flush();
540             shutdown();
541         }
542     }
543 }