1 /** 2 * Implements the RPC server. 3 * 4 * This contains lower level APIs, see vibe.rpcchannel.tcp 5 * and vibe.rpcchannel.noise for a simpler API. 6 */ 7 module vibe.rpcchannel.server; 8 9 import std.traits; 10 import std.exception : enforceEx, collectException; 11 import std.range : ElementType; 12 13 import tinyevent; 14 import vibe.core.stream; 15 import vibe.core.sync; 16 import vibe.core.core; 17 18 import vibe.rpcchannel.base; 19 import vibe.rpcchannel.protocol; 20 21 /** 22 * Whether a class can be used as an API server. 23 */ 24 enum bool isServerAPI(T, ConnectionInfo) = is(typeof({ 25 T session = T.startSession(ConnectionInfo.init); 26 destroy(session); 27 })); 28 29 unittest 30 { 31 static class TestAPI 32 { 33 static TestAPI startSession(void* info) 34 { 35 return new TestAPI(); 36 } 37 } 38 39 assert(isServerAPI!(TestAPI, void*)); 40 } 41 42 /** 43 * Creates a server session. Runs and blocks the current task. 44 * 45 * Throws: Throws exceptions on unrecoverable errors. In such cases the caller 46 * should close the underlying connection. 47 */ 48 ServerSession!API createServerSession(Implementation, API, ConnectionInfo)( 49 Stream stream, ConnectionInfo info) if (isServerAPI!(Implementation, ConnectionInfo)) 50 { 51 auto api = Implementation.startSession(info); 52 auto server = new ServerSession!API(api, stream); 53 return server; 54 } 55 56 /** 57 * One instance of a client<->server communication server session. 58 */ 59 class ServerSession(API) 60 { 61 private: 62 // The api implementation to call methods on 63 API _api; 64 // Underlying transport stream 65 Stream _stream; 66 // Need to make sure result and events are not interleaved 67 TaskMutex _writeMutex; 68 // The main task reading the _stream 69 Task _runTask; 70 71 /* 72 * A recoverable error occured, send error response 73 */ 74 void sendError(CallMessage info, ErrorType type, string msg = "", string file = "", 75 size_t line = 0) 76 { 77 synchronized (_writeMutex) 78 { 79 serializeToJsonLine(_stream, ResponseType.error); 80 auto err = ErrorMessage(info.id, type, msg, file, cast(uint) line); 81 serializeToJsonLine(_stream, err); 82 _stream.flush(); 83 } 84 } 85 86 /* 87 * We received a call reqest. Now read parameters and call the function. 88 * 89 * Note: handles recoverable errors internally by sending an error 90 * to the remote client. Non-recoverable errors propagate as an 91 * Exception and should cause the connection to the client to terminate. 92 */ 93 void callMethod(string member)(CallMessage info) 94 { 95 foreach (MethodType; APIFunctionOverloads!(API, member)) 96 { 97 alias TParams = Parameters!MethodType; 98 alias TRet = ReturnType!MethodType; 99 enum mangle = typeof(MethodType).mangleof; 100 TParams paramInst; 101 102 if (info.parameters != TParams.length || info.mangle != mangle) 103 { 104 continue; 105 } 106 107 size_t paramsRead = 0; 108 try 109 { 110 foreach (i, T; TParams) 111 { 112 paramsRead++; 113 paramInst[i] = _stream.deserializeJsonLine!(T)(); 114 } 115 } 116 catch (Exception e) 117 { 118 // If we failed to parse one parameter, skip rest of request 119 _stream.skipParameters(info.parameters - paramsRead); 120 debug sendError(info, ErrorType.parameterMismatch, e.toString(), e.file, 121 e.line); 122 else 123 sendError(info, ErrorType.parameterMismatch); 124 return; 125 } 126 127 static if (!is(TRet == void)) 128 TRet result; 129 130 try 131 { 132 static if (!is(TRet == void)) 133 mixin(`result = _api.` ~ member ~ `(paramInst);`); 134 else 135 mixin(`_api.` ~ member ~ `(paramInst);`); 136 } 137 catch (Exception e) 138 { 139 // In case the member function called disconnect. If the same 140 // task called, there's no InterruptException thrown. Use the same 141 // code path nevertheless. 142 if (!connected) 143 throw new InterruptException(); 144 145 debug sendError(info, ErrorType.internalError, e.toString(), e.file, 146 e.line); 147 else 148 sendError(info, ErrorType.internalError); 149 return; 150 } 151 152 // In case the member function called disconnect. If the same 153 // task called, there's no InterruptException thrown. Use the same 154 // code path nevertheless. 155 if (!connected) 156 throw new InterruptException(); 157 158 // Write result Message 159 synchronized (_writeMutex) 160 { 161 serializeToJsonLine(_stream, ResponseType.result); 162 auto rMsg = ResultMessage(info.id, !is(TRet == void)); 163 serializeToJsonLine(_stream, rMsg); 164 static if (!is(TRet == void)) 165 serializeToJsonLine(_stream, result); 166 _stream.flush(); 167 } 168 return; 169 } 170 171 // Haven't found any overload 172 _stream.skipParameters(info.parameters); 173 sendError(info, ErrorType.parameterMismatch); 174 return; 175 } 176 177 /* 178 * This is a call request. Search for the correct function to be called. 179 */ 180 void processCall() 181 { 182 auto info = _stream.deserializeJsonLine!CallMessage(); 183 184 foreach (member; APIFunctions!API) 185 { 186 if (info.target == member) 187 { 188 callMethod!(member)(info); 189 return; 190 } 191 } 192 193 // If we have not found a member, otherwise we returned already 194 _stream.skipParameters(info.parameters); 195 sendError(info, ErrorType.notImplemented); 196 } 197 198 /* 199 * Check for current request type and the dispatch to processCall. 200 * 201 * Returns: 202 * false if remote send disconnect request. 203 */ 204 bool processRequest() 205 { 206 auto type = _stream.deserializeJsonLine!RequestType(); 207 switch (type) 208 { 209 case RequestType.call: 210 processCall(); 211 break; 212 case RequestType.disconnect: 213 return false; 214 default: 215 throw new Exception("Invalid request type"); 216 } 217 return true; 218 } 219 220 /* 221 * Register a callback for the event named name. 222 * Once this event is called, send an EventMessage over the _stream. 223 */ 224 void registerEvent(string name)() 225 { 226 alias EventType = ElementType!(typeof(__traits(getMember, API, name))); 227 alias TRet = ReturnType!EventType; 228 alias TArgs = Parameters!EventType; 229 230 TRet onEvent(TArgs args) 231 { 232 assert(args.length < uint.max); 233 234 enforceEx!DisconnectedException(connected); 235 synchronized (_writeMutex) 236 { 237 _stream.serializeToJsonLine(ResponseType.event); 238 auto msg = EventMessage(name, cast(uint) args.length); 239 _stream.serializeToJsonLine(msg); 240 foreach (arg; args) 241 _stream.serializeToJsonLine(arg); 242 _stream.flush(); 243 } 244 245 static if (is(TRet == bool)) 246 return true; 247 } 248 249 mixin(`_api.` ~ name ~ ` ~= &onEvent;`); 250 } 251 252 /* 253 * Iterate all events in the _api and setup event handlers. 254 */ 255 void registerEvents() 256 { 257 foreach (member; APIEvents!API) 258 { 259 registerEvent!member; 260 } 261 } 262 263 /* 264 * Construct a new RPC server. 265 */ 266 this(API api, Stream stream) 267 { 268 _writeMutex = new TaskMutex(); 269 _api = api; 270 _stream = stream; 271 registerEvents(); 272 } 273 274 /* 275 * We were somehow disconnected. Clean up all tasks and notify waiting 276 * calls by throwing Exceptions. 277 */ 278 void shutdown() nothrow 279 { 280 if (_stream) 281 { 282 _stream = null; 283 } 284 285 collectException(destroy(_api)); 286 287 // Do this as the last thing: After run returns the _stream may become 288 // invalid 289 if (_runTask != Task.getThis) 290 collectException(_runTask.interrupt()); 291 } 292 293 public: 294 /** 295 * Whether client session is still connected. 296 * Note: This does not necessarily mean the underlying stream is closed. 297 */ 298 @property bool connected() 299 { 300 return _stream !is null; 301 } 302 303 /** 304 * Run one server session. 305 * 306 * This returns after a clean shutdown or throws an Exception if an error 307 * occured. 308 */ 309 void run() 310 { 311 _runTask = Task.getThis(); 312 try 313 { 314 bool next; 315 do 316 { 317 next = processRequest(); 318 } 319 while (next); 320 321 shutdown(); 322 } 323 catch (InterruptException) 324 { 325 // OK 326 } 327 catch (Exception e) 328 { 329 shutdown(); 330 throw e; 331 } 332 } 333 334 /** 335 * Sends disconnect signal to remote server and stops internal tasks. 336 * 337 * Note: Do not call any functions on this instance after disconnecting. 338 * Emitting further events will throw DisconnectedExceptions. The API instance 339 * will get destroy() ed. 340 * 341 * This does not close the underlying stream. 342 */ 343 void disconnect() 344 { 345 if (!connected) 346 return; 347 348 synchronized (_writeMutex) 349 { 350 _stream.serializeToJsonLine(ResponseType.disconnect); 351 _stream.flush(); 352 shutdown(); 353 } 354 } 355 }