using System; using System.Collections.Generic; using System.Net.WebSockets; using System.Text; using System.Text.RegularExpressions; using System.Threading; using Coolape; using XLua; using UnityEngine; using System.Threading.Tasks; using System.Collections; using System.IO; namespace Dist.SpringWebsocket { //public delegate void Receive(StompFrame frame); public class Client : MonoBehaviour { /// /// 支持的stomp协议版本 /// public static string StompPrototypeVersion = "accept-version:1.1,1.0"; /// /// 心跳时间 /// public static string HeartBeating = "heart-beat:10000,10000"; /// /// 换行符,用于构造 Stomp 消息包 /// public static Char LF = Convert.ToChar(10); /// /// 空字符,用于构造 Stomp 消息包 /// public static Char NULL = Convert.ToChar(0); /// /// 当前连接类型 /// public static string TYPE = "client"; public static MemoryStream receivedBuffer = new MemoryStream(); Queue queueCallback = new Queue(); private static string SubscriptionHeader = "subscription"; private static string DestinationHeader = "destination"; private static string ContentLengthHeader = "content-length"; private static string CID = "dist-connect"; private Dictionary callbacks; private object statusCallback; private Dictionary subscribes; public ClientWebSocket socket; private string url; private static int COUNTER = 0; public Boolean connected = false; public Queue> sendQueue = new Queue>(); public static Client self; public bool isSending = false; public bool isDebug = false; public Client() { self = this; } public async void init(string url, object callback) { //close(); this.url = url; this.statusCallback = callback; this.callbacks = new Dictionary(); this.subscribes = new Dictionary(); try { this.socket = new ClientWebSocket(); socket.ConnectAsync(new Uri(url), CancellationToken.None).Wait(); socket_Opened(this); await StartReceiving(socket); } catch (Exception e) { Debug.LogError(e); socket_Error(this, e.ToString()); } } public string URL { get { return url; } set { this.url = value; } } async Task StartReceiving(ClientWebSocket client) { try { var array = new byte[1024 * 1024]; ArraySegment arraySegment = new ArraySegment(array); var result = await client.ReceiveAsync(arraySegment, CancellationToken.None); while (!result.CloseStatus.HasValue) { if (result.MessageType == WebSocketMessageType.Text) { if (isDebug) { string msg = Encoding.UTF8.GetString(array, 0, result.Count); Debug.Log("receive:" + result.Count + "==\n" + msg); } socket_MessageReceived(client, array, result.Count); } else if (result.MessageType == WebSocketMessageType.Close) { socket_Closed(this); break; } if (client == null || client.State == WebSocketState.Aborted || client.State == WebSocketState.CloseSent || client.State == WebSocketState.CloseReceived || client.State == WebSocketState.Closed ) { socket_Error(this, "State=" + client.State); break; } //arraySegment = new ArraySegment(array); result = await client.ReceiveAsync(arraySegment, CancellationToken.None); } } catch (Exception e) { Debug.LogError(e); socket_Error(this, e.ToString()); } } async Task startSending(ClientWebSocket client) { try { if (client == null || client.State == WebSocketState.Aborted || client.State == WebSocketState.CloseSent || client.State == WebSocketState.CloseReceived || client.State == WebSocketState.Closed) { Debug.LogWarning("startSending=State=" + ((client != null) ? client.State.ToString() : "")); return; } isSending = true; while (sendQueue.Count > 0) { if (client == null || client.State == WebSocketState.Aborted || client.State == WebSocketState.CloseSent || client.State == WebSocketState.CloseReceived || client.State == WebSocketState.Closed) { socket_Error(this, "State=" + client.State); break; } if (sendQueue.Count > 0) { var array = sendQueue.Dequeue(); await socket.SendAsync(array, WebSocketMessageType.Text, true, CancellationToken.None); } } isSending = false; } catch (Exception e) { isSending = false; Debug.LogError(e); socket_Error(this, e.ToString()); } } async void _send(string content) { try { if (isDebug) { Debug.Log("发送数据====================\n" + content); } ArraySegment array = new ArraySegment(Encoding.UTF8.GetBytes(content)); sendQueue.Enqueue(array); if (!isSending) { await startSending(this.socket); } } catch (Exception e) { Debug.LogError(e); } } public void Connect(Dictionary headers, object callback) { if (!this.connected) { this.callbacks[CID] = callback; string data = StompCommandEnum.CONNECT.ToString() + LF; if (headers != null) { foreach (string key in headers.Keys) { data += key + ":" + headers[key] + LF; } } data += StompPrototypeVersion + LF + HeartBeating + LF + LF + NULL; _send(data); } } public void Send(string destination, string content) { this.Send(destination, new Dictionary(), content); } public void Send(string destination, Dictionary headers, string content) { string data = StompCommandEnum.SEND.ToString() + LF; if (headers != null) { foreach (string key in headers.Keys) { data += key + ":" + headers[key] + LF; } } data += DestinationHeader + ":" + destination + LF + ContentLengthHeader + ":" + GetByteCount(content) + LF + LF + content + NULL; _send(data); } public void Send(string destination, LuaTable headers, string content) { string data = StompCommandEnum.SEND.ToString() + LF; if (headers != null) { headers.ForEach((key, val) => { data += key + ":" + val.ToString() + LF; }); } data += DestinationHeader + ":" + destination + LF + ContentLengthHeader + ":" + GetByteCount(content) + LF + LF + content + NULL; _send(data); } public void Subscribe(string destination, object callback) { this.Subscribe(destination, new Dictionary(), callback); } public void Subscribe(string destination, Dictionary headers, object callback) { lock (this) { if (!this.subscribes.ContainsKey(destination)) { string id = "sub-" + COUNTER++; this.callbacks.Add(id, callback); this.subscribes.Add(destination, id); string data = StompCommandEnum.SUBSCRIBE.ToString() + LF + "id:" + id + LF; foreach (string key in headers.Keys) { data += key + ":" + headers[key] + LF; } data += DestinationHeader + ":" + destination + LF + LF + NULL; _send(data); } } } public void UnSubscribe(string destination) { if (this.subscribes.ContainsKey(destination)) { _send(StompCommandEnum.UNSUBSCRIBE.ToString() + LF + "id:" + this.subscribes[destination] + LF + LF + NULL); this.callbacks.Remove(this.subscribes[destination]); this.subscribes.Remove(destination); } } public void DisConnect() { //ArrayList list = new ArrayList(); //list.AddRange(subscribes.Keys); //foreach (var key in list) //{ // UnSubscribe(key.ToString()); //} //this.callbacks.Clear(); //this.subscribes.Clear(); _send(StompCommandEnum.DISCONNECT.ToString() + LF + LF + NULL); this.connected = false; isSending = false; } public bool IsSubscribed(string destination) { return this.subscribes.ContainsKey(destination); } private int GetByteCount(string content) { return Regex.Split(Uri.EscapeUriString(content), "%..|.").Length - 1; } private StompFrame TransformResultFrame(string content) { lock (this) { StompFrame frame = new StompFrame(); //string[] matches = Regex.Split(content, "" + NULL + LF + "*"); //foreach (var line in matches) //{ //if (line.Length > 0) //{ this.HandleSingleLine(content, frame); // } //} return frame; } } private void HandleSingleLine(string line, StompFrame frame) { int divider = line.IndexOf("" + LF + LF); if (divider >= 0) { string[] headerLines = Regex.Split(line.Substring(0, divider), "" + LF); frame.Code = (StatusCodeEnum)Enum.Parse(typeof(StatusCodeEnum), headerLines[0]); for (int i = 1; i < headerLines.Length; i++) { int index = headerLines[i].IndexOf(":"); string key = headerLines[i].Substring(0, index); string value = headerLines[i].Substring(index + 1); frame.AddHeader(Regex.Replace(key, @"^\s+|\s+$", ""), Regex.Replace(value, @"^\s+|\s+$", "")); } frame.Content = line.Substring(divider + 2); } } private void socket_Error(object sender, string errMsg) { if (isDebug) { Debug.LogWarning("socket_Error==" + errMsg); } //close(); //this.connected = false; queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERERROR, errMsg, null, statusCallback)); } void socket_Closed(object sender) { if (isDebug) { Debug.LogWarning("socket_Closed"); } this.connected = false; if (socket != null) { socket.Abort(); socket.Dispose(); socket = null; } //this.socket.Dispose(); queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback)); } private void socket_Opened(object sender) { queueCallback.Enqueue(new StompFrame(StatusCodeEnum.OPENSERVER, "成功连接到服务器", null, statusCallback)); } public static int __maxLen = 1024 * 1024; byte[] tmpBuffer = new byte[__maxLen]; StringBuilder inputSb = new StringBuilder(); void socket_MessageReceived(object sender, byte[] bytes, int len) { receivedBuffer.Write(bytes, 0, len); int totalLen = (int)(receivedBuffer.Length); receivedBuffer.Position = 0; receivedBuffer.Read(tmpBuffer, 0, totalLen); receivedBuffer.Position = totalLen; string msg = Encoding.UTF8.GetString(tmpBuffer, 0, totalLen); if (isDebug) { Debug.LogWarning("=======取得的总数据===========" + totalLen); Debug.LogWarning(msg); } inputSb.Append(msg); while (true) { int index = inputSb.ToString().IndexOf(NULL); if (index < 0) { //说明数据包还不完整 break; } string content = inputSb.ToString().Substring(0, index); inputSb.Remove(0, index + 1); if (isDebug) { Debug.LogWarning("=======frame==========="); Debug.LogWarning(content); } StompFrame frame = this.TransformResultFrame(content); object callback; switch (frame.Code) { case StatusCodeEnum.CONNECTED: connected = true; callbacks.TryGetValue(CID, out callback); frame.callback = callback; queueCallback.Enqueue(frame); break; case StatusCodeEnum.MESSAGE: callbacks.TryGetValue(frame.GetHeader(SubscriptionHeader), out callback); frame.callback = callback; queueCallback.Enqueue(frame); break; case StatusCodeEnum.ERROR: socket.Abort(); socket.Dispose(); socket = null; Debug.LogWarning(frame.Code); break; default: Debug.LogError(frame.Code); break; } } // left msg proc msg = inputSb.ToString(); inputSb.Clear(); int leftLen = 0; byte[] leftBytes = null; if (!string.IsNullOrEmpty(msg)) { leftBytes = Encoding.UTF8.GetBytes(msg); leftLen = leftBytes.Length; } if (isDebug) { Debug.LogWarning("left len====" + leftLen); } if (totalLen != leftLen) { if (leftLen > 0) { receivedBuffer.Position = 0; receivedBuffer.Write(leftBytes, 0, leftLen); receivedBuffer.Position = leftLen; } else { receivedBuffer.Position = 0; receivedBuffer.SetLength(0); } } } //=============================================== StompFrame _stompframe; private void Update() { if (queueCallback.Count > 0) { _stompframe = queueCallback.Dequeue(); if (_stompframe != null) { Utl.doCallback(_stompframe.callback, _stompframe); } } } public void close() { try { receivedBuffer.Position = 0; receivedBuffer.SetLength(0); if (connected) { connected = false; isSending = false; if (isDebug) { Debug.Log("close==="); } if (socket == null || socket.State == WebSocketState.Aborted || socket.State == WebSocketState.CloseSent || socket.State == WebSocketState.CloseReceived || socket.State == WebSocketState.Closed) { if (socket != null) { if (isDebug) { Debug.LogWarning("socket.Abort"); } //socket.CloseOutputAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None).Wait(); //socket.CloseAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None).Wait(); socket.Abort(); socket.Dispose(); socket = null; } } else { if (isDebug) { Debug.LogWarning("send DisConnect"); } DisConnect(); } queueCallback.Clear(); } } catch (Exception e) { Debug.LogError(e); } } private void OnDestroy() { close(); } } }