1 /**
2  * Implements the RPC server.
3  *
4  * This contains lower level APIs, see vibe.rpcchannel.tcp
5  * and vibe.rpcchannel.noise for a simpler API.
6  */
7 module vibe.rpcchannel.server;
8 
9 import std.traits;
10 import std.exception : enforceEx, collectException;
11 import std.range : ElementType;
12 
13 import tinyevent;
14 import vibe.core.stream;
15 import vibe.core.sync;
16 import vibe.core.core;
17 
18 import vibe.rpcchannel.base;
19 import vibe.rpcchannel.protocol;
20 
21 /**
22  * Whether a class can be used as an API server.
23  */
24 enum bool isServerAPI(T, ConnectionInfo) = is(typeof({
25     T session = T.startSession(ConnectionInfo.init);
26     destroy(session);
27 }));
28 
29 unittest
30 {
31     static class TestAPI
32     {
33         static TestAPI startSession(void* info)
34         {
35             return new TestAPI();
36         }
37     }
38 
39     assert(isServerAPI!(TestAPI, void*));
40 }
41 
42 /**
43  * Creates a server session. Runs and blocks the current task.
44  * 
45  * Throws: Throws exceptions on unrecoverable errors. In such cases the caller
46  * should close the underlying connection.
47  */
48 ServerSession!API createServerSession(Implementation, API, ConnectionInfo)(
49     Stream stream, ConnectionInfo info) if (isServerAPI!(Implementation, ConnectionInfo))
50 {
51     auto api = Implementation.startSession(info);
52     auto server = new ServerSession!API(api, stream);
53     return server;
54 }
55 
56 /**
57  * One instance of a client<->server communication server session.
58  */
59 class ServerSession(API)
60 {
61 private:
62     // The api implementation to call methods on
63     API _api;
64     // Underlying transport stream
65     Stream _stream;
66     // Need to make sure result and events are not interleaved
67     TaskMutex _writeMutex;
68     // The main task reading the _stream
69     Task _runTask;
70 
71     /*
72      * A recoverable error occured, send error response
73      */
74     void sendError(CallMessage info, ErrorType type, string msg = "", string file = "",
75         size_t line = 0)
76     {
77         synchronized (_writeMutex)
78         {
79             serializeToJsonLine(_stream, ResponseType.error);
80             auto err = ErrorMessage(info.id, type, msg, file, cast(uint) line);
81             serializeToJsonLine(_stream, err);
82             _stream.flush();
83         }
84     }
85 
86     /*
87      * We received a call reqest. Now read parameters and call the function.
88      *
89      * Note: handles recoverable errors internally by sending an error
90      * to the remote client. Non-recoverable errors propagate as an
91      * Exception and should cause the connection to the client to terminate.
92      */
93     void callMethod(string member)(CallMessage info)
94     {
95         foreach (MethodType; APIFunctionOverloads!(API, member))
96         {
97             alias TParams = Parameters!MethodType;
98             alias TRet = ReturnType!MethodType;
99             enum mangle = typeof(MethodType).mangleof;
100             TParams paramInst;
101 
102             if (info.parameters != TParams.length || info.mangle != mangle)
103             {
104                 continue;
105             }
106 
107             size_t paramsRead = 0;
108             try
109             {
110                 foreach (i, T; TParams)
111                 {
112                     paramsRead++;
113                     paramInst[i] = _stream.deserializeJsonLine!(T)();
114                 }
115             }
116             catch (Exception e)
117             {
118                 // If we failed to parse one parameter, skip rest of request
119                 _stream.skipParameters(info.parameters - paramsRead);
120                 debug sendError(info, ErrorType.parameterMismatch, e.toString(), e.file,
121                     e.line);
122                     else
123                     sendError(info, ErrorType.parameterMismatch);
124                 return;
125             }
126 
127             static if (!is(TRet == void))
128                 TRet result;
129 
130             try
131             {
132                 static if (!is(TRet == void))
133                     mixin(`result = _api.` ~ member ~ `(paramInst);`);
134                 else
135                     mixin(`_api.` ~ member ~ `(paramInst);`);
136             }
137             catch (Exception e)
138             {
139                 // In case the member function called disconnect. If the same
140                 // task called, there's no InterruptException thrown. Use the same
141                 // code path nevertheless.
142                 if (!connected)
143                     throw new InterruptException();
144 
145                 debug sendError(info, ErrorType.internalError, e.toString(), e.file,
146                     e.line);
147                     else
148                     sendError(info, ErrorType.internalError);
149                 return;
150             }
151 
152             // In case the member function called disconnect. If the same
153             // task called, there's no InterruptException thrown. Use the same
154             // code path nevertheless.
155             if (!connected)
156                 throw new InterruptException();
157 
158             // Write result Message
159             synchronized (_writeMutex)
160             {
161                 serializeToJsonLine(_stream, ResponseType.result);
162                 auto rMsg = ResultMessage(info.id, !is(TRet == void));
163                 serializeToJsonLine(_stream, rMsg);
164                 static if (!is(TRet == void))
165                     serializeToJsonLine(_stream, result);
166                 _stream.flush();
167             }
168             return;
169         }
170 
171         // Haven't found any overload
172         _stream.skipParameters(info.parameters);
173         sendError(info, ErrorType.parameterMismatch);
174         return;
175     }
176 
177     /*
178      * This is a call request. Search for the correct function to be called.
179      */
180     void processCall()
181     {
182         auto info = _stream.deserializeJsonLine!CallMessage();
183 
184         foreach (member; APIFunctions!API)
185         {
186             if (info.target == member)
187             {
188                 callMethod!(member)(info);
189                 return;
190             }
191         }
192 
193         // If we have not found a member, otherwise we returned already
194         _stream.skipParameters(info.parameters);
195         sendError(info, ErrorType.notImplemented);
196     }
197 
198     /*
199      * Check for current request type and the dispatch to processCall.
200      *
201      * Returns:
202      * false if remote send disconnect request.
203      */
204     bool processRequest()
205     {
206         auto type = _stream.deserializeJsonLine!RequestType();
207         switch (type)
208         {
209         case RequestType.call:
210             processCall();
211             break;
212         case RequestType.disconnect:
213             return false;
214         default:
215             throw new Exception("Invalid request type");
216         }
217         return true;
218     }
219 
220     /*
221      * Register a callback for the event named name.
222      * Once this event is called, send an EventMessage over the _stream.
223      */
224     void registerEvent(string name)()
225     {
226         alias EventType = ElementType!(typeof(__traits(getMember, API, name)));
227         alias TRet = ReturnType!EventType;
228         alias TArgs = Parameters!EventType;
229 
230         TRet onEvent(TArgs args)
231         {
232             assert(args.length < uint.max);
233 
234             enforceEx!DisconnectedException(connected);
235             synchronized (_writeMutex)
236             {
237                 _stream.serializeToJsonLine(ResponseType.event);
238                 auto msg = EventMessage(name, cast(uint) args.length);
239                 _stream.serializeToJsonLine(msg);
240                 foreach (arg; args)
241                     _stream.serializeToJsonLine(arg);
242                 _stream.flush();
243             }
244 
245             static if (is(TRet == bool))
246                 return true;
247         }
248 
249         mixin(`_api.` ~ name ~ ` ~= &onEvent;`);
250     }
251 
252     /*
253      * Iterate all events in the _api and setup event handlers.
254      */
255     void registerEvents()
256     {
257         foreach (member; APIEvents!API)
258         {
259             registerEvent!member;
260         }
261     }
262 
263     /*
264      * Construct a new RPC server.
265      */
266     this(API api, Stream stream)
267     {
268         _writeMutex = new TaskMutex();
269         _api = api;
270         _stream = stream;
271         registerEvents();
272     }
273 
274     /*
275      * We were somehow disconnected. Clean up all tasks and notify waiting
276      * calls by throwing Exceptions.
277      */
278     void shutdown() nothrow
279     {
280         if (_stream)
281         {
282             _stream = null;
283         }
284 
285         collectException(destroy(_api));
286 
287         // Do this as the last thing: After run returns the _stream may become
288         // invalid
289         if (_runTask != Task.getThis)
290             collectException(_runTask.interrupt());
291     }
292 
293 public:
294     /**
295      * Whether client session is still connected.
296      * Note: This does not necessarily mean the underlying stream is closed.
297      */
298     @property bool connected()
299     {
300         return _stream !is null;
301     }
302 
303     /**
304      * Run one server session.
305      * 
306      * This returns after a clean shutdown or throws an Exception if an error
307      * occured.
308      */
309     void run()
310     {
311         _runTask = Task.getThis();
312         try
313         {
314             bool next;
315             do
316             {
317                 next = processRequest();
318             }
319             while (next);
320 
321             shutdown();
322         }
323         catch (InterruptException)
324         {
325             // OK
326         }
327         catch (Exception e)
328         {
329             shutdown();
330             throw e;
331         }
332     }
333 
334     /**
335      * Sends disconnect signal to remote server and stops internal tasks.
336      * 
337      * Note: Do not call any functions on this instance after disconnecting.
338      * Emitting further events will throw DisconnectedExceptions. The API instance
339      * will get destroy() ed.
340      *
341      * This does not close the underlying stream.
342      */
343     void disconnect()
344     {
345         if (!connected)
346             return;
347 
348         synchronized (_writeMutex)
349         {
350             _stream.serializeToJsonLine(ResponseType.disconnect);
351             _stream.flush();
352             shutdown();
353         }
354     }
355 }