1 /** 2 * Provides high level interface for TCP based RPC. 3 * 4 */ 5 module vibe.rpcchannel.tcp; 6 7 import vibe.core.net; 8 import vibe.rpcchannel.server; 9 import vibe.rpcchannel.client; 10 11 /** 12 * The RPC server implementation using a unencrypted TCPConnection transport stream. 13 */ 14 class TCPServer(Implementation, API) 15 { 16 static struct TCPServerSession 17 { 18 ServerSession!API session; 19 TCPConnection conn; 20 } 21 22 private: 23 TCPListener[] _listener; 24 TCPServerSession[] _sessions; 25 26 /* 27 * Called if other side disconnected first. 28 */ 29 void onDisconnect(ServerSession!API session) 30 { 31 size_t j = 0; 32 for (size_t i = 0; i < _sessions.length; i++) 33 { 34 if (_sessions[i].session != session) 35 { 36 _sessions[j] = _sessions[i]; 37 j++; 38 } 39 } 40 _sessions.length = j; 41 } 42 43 /* 44 * Called when TCP listener receives a new connection. 45 */ 46 void onTCPConnect(TCPConnection conn) 47 { 48 TCPServerSession tcp; 49 tcp.session = createServerSession!(Implementation, API)(conn, conn); 50 tcp.conn = conn; 51 _sessions ~= tcp; 52 scope (exit) 53 { 54 onDisconnect(tcp.session); 55 } 56 tcp.session.run(); 57 } 58 59 public: 60 /** 61 * Stop listening for new connections and cleanly disconnect all 62 * existing connections. 63 */ 64 void stop() 65 { 66 foreach (listener; _listener) 67 listener.stopListening(); 68 69 foreach (session; _sessions) 70 { 71 session.session.disconnect(); 72 } 73 } 74 } 75 76 /** 77 * Create a new TCP based API server. 78 */ 79 TCPServer!(Implementation, API) serveTCP(Implementation, API)(ushort port) if ( 80 isServerAPI!(Implementation, TCPConnection)) 81 { 82 auto server = new TCPServer!(Implementation, API)(); 83 server._listener = listenTCP(port, &server.onTCPConnect); 84 return server; 85 } 86 87 /// 88 unittest 89 { 90 abstract static class API 91 { 92 string someMethod(string name); 93 } 94 95 static class Implementation : API 96 { 97 static Implementation startSession(TCPConnection conn) 98 { 99 return new Implementation(); 100 } 101 102 override string someMethod(string name) 103 { 104 return "Hello " ~ name ~ "!"; 105 } 106 } 107 108 auto server = serveTCP!(Implementation, API)(8030); 109 server.stop(); 110 } 111 112 /// ditto 113 TCPServer!(Implementation, API) serveTCP(Implementation, API)(ushort port, string address) if ( 114 isServerAPI!(Implementation, TCPConnection)) 115 { 116 auto server = new TCPServer!(Implementation, API)(); 117 server._listener = [listenTCP(port, &server.onTCPConnect, address)]; 118 return server; 119 } 120 121 /** 122 * An alias for a TCP based RPCClient. 123 */ 124 template TCPClient(API) 125 { 126 alias TCPClient = RPCClient!(API, TCPConnection); 127 } 128 129 /** 130 * Connect to a remote, TCP based API server. 131 * 132 * Examples: 133 * ------- 134 * abstract static class API 135 * { 136 * string someMethod(string name); 137 * } 138 * auto client = clientTCP!API("127.0.0.1", 8030); 139 * client.someMethod("john"); 140 * client.closeTCP(); 141 * ------ 142 */ 143 auto clientTCP(API)(string host, ushort port, string bind_interface = null, 144 ushort bind_port = cast(ushort) 0u) 145 { 146 auto stream = connectTCP(host, port, bind_interface, bind_port); 147 return createClientSession!(API, TCPConnection)(stream, stream); 148 } 149 150 /// ditto 151 auto clientTCP(API)(NetworkAddress addr, NetworkAddress bind_address = anyAddress()) 152 { 153 auto stream = connectTCP(addr, bind_address); 154 return createClientSession!(API, TCPConnection)(stream, stream); 155 } 156 157 /** 158 * Completely close the connections maintained by a TCPClient 159 */ 160 void closeTCP(Client)(Client client) 161 { 162 client.disconnect(); 163 if (client.connectionInfo.connected) 164 client.connectionInfo.close(); 165 }