diff options
| author | Ralph Amissah <ralph.amissah@gmail.com> | 2020-04-02 17:52:14 -0400 | 
|---|---|---|
| committer | Ralph Amissah <ralph.amissah@gmail.com> | 2020-05-20 11:27:25 -0400 | 
| commit | 408a0f83a64d6f1e9d702dc805ce0c670a5aa472 (patch) | |
| tree | 31028f542d201d08f99d688329aa584723d20627 | |
| parent | cgi.d by Adam Ruppe used, license Boost (diff) | |
cgi.d track changes by Adam Ruppe
| -rw-r--r-- | ext_lib/src/arsd/cgi.d | 372 | 
1 files changed, 321 insertions, 51 deletions
| diff --git a/ext_lib/src/arsd/cgi.d b/ext_lib/src/arsd/cgi.d index 681fae4..caee996 100644 --- a/ext_lib/src/arsd/cgi.d +++ b/ext_lib/src/arsd/cgi.d @@ -4664,6 +4664,37 @@ version(cgi_with_websocket) {  		// returns true if data available, false if it timed out  		bool recvAvailable(Duration timeout = dur!"msecs"(0)) { +			if(!waitForNextMessageWouldBlock()) +				return true; +			if(isDataPending(timeout)) +				return true; // this is kinda a lie. + +			return false; +		} + +		public bool lowLevelReceive() { +			auto bfr = cgi.idlol; +			top: +			auto got = bfr.front; +			if(got.length) { +				if(receiveBuffer.length < receiveBufferUsedLength + got.length) +					receiveBuffer.length += receiveBufferUsedLength + got.length; + +				receiveBuffer[receiveBufferUsedLength .. receiveBufferUsedLength + got.length] = got[]; +				receiveBufferUsedLength += got.length; +				bfr.consume(got.length); + +				return true; +			} + +			bfr.popFront(0); +			if(bfr.sourceClosed) +				return false; +			goto top; +		} + + +		bool isDataPending(Duration timeout = 0.seconds) {  			Socket socket = cgi.idlol.source;  			auto check = new SocketSet(); @@ -4676,47 +4707,297 @@ version(cgi_with_websocket) {  		}  		// note: this blocks -		WebSocketMessage recv() { -			// FIXME: should we automatically handle pings and pongs? -			if(cgi.idlol.empty()) -				throw new Exception("remote side disconnected"); -			cgi.idlol.popFront(0); +		WebSocketFrame recv() { +			return waitForNextMessage(); +		} -			WebSocketMessage message; -			message = WebSocketMessage.read(cgi.idlol); -			return message; + +		private void llclose() { +			cgi.close();  		} -		void send(in char[] text) { -			// I cast away const here because I know this msg is private and it doesn't write -			// to that buffer unless masking is set... which it isn't, so we're ok. -			auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.text, cast(void[]) text); -			msg.send(cgi); +		private void llsend(ubyte[] data) { +			cgi.write(data); +			cgi.flush();  		} -		void send(in ubyte[] binary) { -			// I cast away const here because I know this msg is private and it doesn't write -			// to that buffer unless masking is set... which it isn't, so we're ok. -			auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.binary, cast(void[]) binary); -			msg.send(cgi); +		void unregisterActiveSocket(WebSocket) {} + +		/* copy/paste section { */ + +		private int readyState_; +		private ubyte[] receiveBuffer; +		private size_t receiveBufferUsedLength; + +		private Config config; + +		enum CONNECTING = 0; /// Socket has been created. The connection is not yet open. +		enum OPEN = 1; /// The connection is open and ready to communicate. +		enum CLOSING = 2; /// The connection is in the process of closing. +		enum CLOSED = 3; /// The connection is closed or couldn't be opened. + +		/++ + +		+/ +		/// Group: foundational +		static struct Config { +			/++ +				These control the size of the receive buffer. + +				It starts at the initial size, will temporarily +				balloon up to the maximum size, and will reuse +				a buffer up to the likely size. + +				Anything larger than the maximum size will cause +				the connection to be aborted and an exception thrown. +				This is to protect you against a peer trying to +				exhaust your memory, while keeping the user-level +				processing simple. +			+/ +			size_t initialReceiveBufferSize = 4096; +			size_t likelyReceiveBufferSize = 4096; /// ditto +			size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto + +			/++ +				Maximum combined size of a message. +			+/ +			size_t maximumMessageSize = 10 * 1024 * 1024; + +			string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; +			string origin; /// Origin URL to send with the handshake, if desired. +			string protocol; /// the protocol header, if desired. + +			int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping  		} -		void close() { -			auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.close, null); -			msg.send(cgi); +		/++ +			Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED]. +		+/ +		int readyState() { +			return readyState_;  		} +		/++ +			Closes the connection, sending a graceful teardown message to the other side. +		+/ +		/// Group: foundational +		void close(int code = 0, string reason = null) +			//in (reason.length < 123) +			in { assert(reason.length < 123); } do +		{ +			if(readyState_ != OPEN) +				return; // it cool, we done +			WebSocketFrame wss; +			wss.fin = true; +			wss.opcode = WebSocketOpcode.close; +			wss.data = cast(ubyte[]) reason; +			wss.send(&llsend); + +			readyState_ = CLOSING; + +			llclose(); +		} + +		/++ +			Sends a ping message to the server. This is done automatically by the library if you set a non-zero [Config.pingFrequency], but you can also send extra pings explicitly as well with this function. +		+/ +		/// Group: foundational  		void ping() { -			auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.ping, null); -			msg.send(cgi); +			WebSocketFrame wss; +			wss.fin = true; +			wss.opcode = WebSocketOpcode.ping; +			wss.send(&llsend);  		} +		// automatically handled....  		void pong() { -			auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.pong, null); -			msg.send(cgi); +			WebSocketFrame wss; +			wss.fin = true; +			wss.opcode = WebSocketOpcode.pong; +			wss.send(&llsend);  		} + +		/++ +			Sends a text message through the websocket. +		+/ +		/// Group: foundational +		void send(in char[] textData) { +			WebSocketFrame wss; +			wss.fin = true; +			wss.opcode = WebSocketOpcode.text; +			wss.data = cast(ubyte[]) textData; +			wss.send(&llsend); +		} + +		/++ +			Sends a binary message through the websocket. +		+/ +		/// Group: foundational +		void send(in ubyte[] binaryData) { +			WebSocketFrame wss; +			wss.fin = true; +			wss.opcode = WebSocketOpcode.binary; +			wss.data = cast(ubyte[]) binaryData; +			wss.send(&llsend); +		} + +		/++ +			Waits for and returns the next complete message on the socket. + +			Note that the onmessage function is still called, right before +			this returns. +		+/ +		/// Group: blocking_api +		public WebSocketFrame waitForNextMessage() { +			do { +				auto m = processOnce(); +				if(m.populated) +					return m; +			} while(lowLevelReceive()); + +			return WebSocketFrame.init; // FIXME? maybe. +		} + +		/++ +			Tells if [waitForNextMessage] would block. +		+/ +		/// Group: blocking_api +		public bool waitForNextMessageWouldBlock() { +			checkAgain: +			if(isMessageBuffered()) +				return false; +			if(!isDataPending()) +				return true; +			while(isDataPending()) +				lowLevelReceive(); +			goto checkAgain; +		} + +		/++ +			Is there a message in the buffer already? +			If `true`, [waitForNextMessage] is guaranteed to return immediately. +			If `false`, check [isDataPending] as the next step. +		+/ +		/// Group: blocking_api +		public bool isMessageBuffered() { +			ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; +			auto s = d; +			if(d.length) { +				auto orig = d; +				auto m = WebSocketFrame.read(d); +				// that's how it indicates that it needs more data +				if(d !is orig) +					return true; +			} + +			return false; +		} + +		private ubyte continuingType; +		private ubyte[] continuingData; +		//private size_t continuingDataLength; + +		private WebSocketFrame processOnce() { +			ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; +			auto s = d; +			// FIXME: handle continuation frames more efficiently. it should really just reuse the receive buffer. +			WebSocketFrame m; +			if(d.length) { +				auto orig = d; +				m = WebSocketFrame.read(d); +				// that's how it indicates that it needs more data +				if(d is orig) +					return WebSocketFrame.init; +				switch(m.opcode) { +					case WebSocketOpcode.continuation: +						if(continuingData.length + m.data.length > config.maximumMessageSize) +							throw new Exception("message size exceeded"); + +						continuingData ~= m.data; +						if(m.fin) { +							if(ontextmessage) +								ontextmessage(cast(char[]) continuingData); +							if(onbinarymessage) +								onbinarymessage(continuingData); + +							continuingData = null; +						} +					break; +					case WebSocketOpcode.text: +						if(m.fin) { +							if(ontextmessage) +								ontextmessage(m.textData); +						} else { +							continuingType = m.opcode; +							//continuingDataLength = 0; +							continuingData = null; +							continuingData ~= m.data; +						} +					break; +					case WebSocketOpcode.binary: +						if(m.fin) { +							if(onbinarymessage) +								onbinarymessage(m.data); +						} else { +							continuingType = m.opcode; +							//continuingDataLength = 0; +							continuingData = null; +							continuingData ~= m.data; +						} +					break; +					case WebSocketOpcode.close: +						readyState_ = CLOSED; +						if(onclose) +							onclose(); + +						unregisterActiveSocket(this); +					break; +					case WebSocketOpcode.ping: +						pong(); +					break; +					case WebSocketOpcode.pong: +						// just really references it is still alive, nbd. +					break; +					default: // ignore though i could and perhaps should throw too +				} +			} +			receiveBufferUsedLength -= s.length - d.length; + +			return m; +		} + +		private void autoprocess() { +			// FIXME +			do { +				processOnce(); +			} while(lowLevelReceive()); +		} + + +		void delegate() onclose; /// +		void delegate() onerror; /// +		void delegate(in char[]) ontextmessage; /// +		void delegate(in ubyte[]) onbinarymessage; /// +		void delegate() onopen; /// + +		/++ + +		+/ +		/// Group: browser_api +		void onmessage(void delegate(in char[]) dg) { +			ontextmessage = dg; +		} + +		/// ditto +		void onmessage(void delegate(in ubyte[]) dg) { +			onbinarymessage = dg; +		} + +		/* } end copy/paste */ + +  	}  	bool websocketRequested(Cgi cgi) { @@ -4755,10 +5036,11 @@ version(cgi_with_websocket) {  		return new WebSocket(cgi);  	} -	// FIXME: implement websocket extension frames -	// get websocket to work on other modes, not just embedded_httpd +	// FIXME get websocket to work on other modes, not just embedded_httpd +	/* copy/paste in http2.d { */  	enum WebSocketOpcode : ubyte { +		continuation = 0,  		text = 1,  		binary = 2,  		// 3, 4, 5, 6, 7 RESERVED @@ -4768,7 +5050,7 @@ version(cgi_with_websocket) {  		// 11,12,13,14,15 RESERVED  	} -	struct WebSocketMessage { +	public struct WebSocketFrame {  		private bool populated;  		bool fin;  		bool rsv1; @@ -4781,8 +5063,8 @@ version(cgi_with_websocket) {  		ubyte[4] maskingKey; // don't set this when sending  		ubyte[] data; -		static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) { -			WebSocketMessage msg; +		static WebSocketFrame simpleMessage(WebSocketOpcode opcode, void[] data) { +			WebSocketFrame msg;  			msg.fin = true;  			msg.opcode = opcode;  			msg.data = cast(ubyte[]) data; @@ -4790,7 +5072,7 @@ version(cgi_with_websocket) {  			return msg;  		} -		private void send(Cgi cgi) { +		private void send(scope void delegate(ubyte[]) llsend) {  			ubyte[64] headerScratch;  			int headerScratchPos = 0; @@ -4846,7 +5128,7 @@ version(cgi_with_websocket) {  				headerScratch[1] = b2;  			} -			assert(!masked, "masking key not properly implemented"); +			//assert(!masked, "masking key not properly implemented");  			if(masked) {  				// FIXME: randomize this  				headerScratch[headerScratchPos .. headerScratchPos + 4] = maskingKey[]; @@ -4864,19 +5146,18 @@ version(cgi_with_websocket) {  			}  			//writeln("SENDING ", headerScratch[0 .. headerScratchPos], data); -			cgi.write(headerScratch[0 .. headerScratchPos]); -			cgi.write(data); -			cgi.flush(); +			llsend(headerScratch[0 .. headerScratchPos]); +			llsend(data);  		} -		static WebSocketMessage read(ref ubyte[] d) { -			WebSocketMessage msg; +		static WebSocketFrame read(ref ubyte[] d) { +			WebSocketFrame msg;  			auto orig = d; -			WebSocketMessage needsMoreData() { +			WebSocketFrame needsMoreData() {  				d = orig; -				return WebSocketMessage.init; +				return WebSocketFrame.init;  			}  			if(d.length < 2) @@ -4957,22 +5238,11 @@ version(cgi_with_websocket) {  			return msg;  		} -		static WebSocketMessage read(BufferedInputRange ir) { -			readmore: -			auto d = ir.front(); -			auto m = read(d); -			if(m is WebSocketMessage.init) { -				ir.popFront(); -				goto readmore; -			} -			return m; -		} -  		char[] textData() {  			return cast(char[]) data;  		}  	} - +	/* } */  } | 
