Files
tianrunCRM/Assets/trCRM/DistSpringWebsocketClient/Client.cs
2020-07-08 08:01:34 +08:00

513 lines
18 KiB
C#

using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using Coolape;
using XLua;
using UnityEngine;
using System.Threading.Tasks;
using System.Collections;
using System.IO;
namespace Dist.SpringWebsocket
{
//public delegate void Receive(StompFrame frame);
public class Client : MonoBehaviour
{
/// <summary>
/// 支持的stomp协议版本
/// </summary>
public static string StompPrototypeVersion = "accept-version:1.1,1.0";
/// <summary>
/// 心跳时间
/// </summary>
public static string HeartBeating = "heart-beat:10000,10000";
/// <summary>
/// 换行符,用于构造 Stomp 消息包
/// </summary>
public static Char LF = Convert.ToChar(10);
/// <summary>
/// 空字符,用于构造 Stomp 消息包
/// </summary>
public static Char NULL = Convert.ToChar(0);
/// <summary>
/// 当前连接类型
/// </summary>
public static string TYPE = "client";
public static MemoryStream receivedBuffer = new MemoryStream();
Queue<StompFrame> queueCallback = new Queue<StompFrame>();
private static string SubscriptionHeader = "subscription";
private static string DestinationHeader = "destination";
private static string ContentLengthHeader = "content-length";
private static string CID = "dist-connect";
private Dictionary<string, object> callbacks;
private object statusCallback;
private Dictionary<string, string> subscribes;
public ClientWebSocket socket;
private string url;
private static int COUNTER = 0;
public Boolean connected = false;
public Queue<ArraySegment<byte>> sendQueue = new Queue<ArraySegment<byte>>();
public static Client self;
public bool isSending = false;
public bool isDebug = false;
public Client()
{
self = this;
}
public async void init(string url, object callback)
{
//close();
this.url = url;
this.statusCallback = callback;
this.callbacks = new Dictionary<string, object>();
this.subscribes = new Dictionary<string, string>();
try
{
this.socket = new ClientWebSocket();
socket.ConnectAsync(new Uri(url), CancellationToken.None).Wait();
socket_Opened(this);
await StartReceiving(socket);
}
catch (Exception e)
{
Debug.LogError(e);
socket_Error(this, e.ToString());
}
}
public string URL
{
get { return url; }
set { this.url = value; }
}
async Task StartReceiving(ClientWebSocket client)
{
try
{
var array = new byte[1024 * 1024];
ArraySegment<byte> arraySegment = new ArraySegment<byte>(array);
var result = await client.ReceiveAsync(arraySegment, CancellationToken.None);
while (!result.CloseStatus.HasValue)
{
if (result.MessageType == WebSocketMessageType.Text)
{
if (isDebug)
{
string msg = Encoding.UTF8.GetString(array, 0, result.Count);
Debug.Log("receive:" + result.Count + "==\n" + msg);
}
socket_MessageReceived(client, array, result.Count);
}
else if (result.MessageType == WebSocketMessageType.Close)
{
socket_Closed(this);
break;
}
if (client == null || client.State == WebSocketState.Aborted
|| client.State == WebSocketState.CloseSent
|| client.State == WebSocketState.CloseReceived
|| client.State == WebSocketState.Closed
)
{
socket_Error(this, "State=" + client.State);
break;
}
result = await client.ReceiveAsync(arraySegment, CancellationToken.None);
}
}
catch (Exception e)
{
Debug.LogError(e);
socket_Error(this, e.ToString());
}
}
async Task startSending(ClientWebSocket client)
{
try
{
if (client == null || client.State == WebSocketState.Aborted
|| client.State == WebSocketState.CloseSent
|| client.State == WebSocketState.CloseReceived
|| client.State == WebSocketState.Closed)
{
Debug.LogWarning("startSending=State=" + ((client != null) ? client.State.ToString() : ""));
return;
}
isSending = true;
while (sendQueue.Count > 0)
{
if (client == null || client.State == WebSocketState.Aborted
|| client.State == WebSocketState.CloseSent
|| client.State == WebSocketState.CloseReceived
|| client.State == WebSocketState.Closed)
{
socket_Error(this, "State=" + client.State);
break;
}
if (sendQueue.Count > 0)
{
var array = sendQueue.Dequeue();
await socket.SendAsync(array, WebSocketMessageType.Text, true, CancellationToken.None);
}
}
isSending = false;
}
catch (Exception e)
{
isSending = false;
Debug.LogError(e);
socket_Error(this, e.ToString());
}
}
async void _send(string content)
{
if (isDebug)
{
Debug.Log("send==\n" + content);
}
ArraySegment<byte> array = new ArraySegment<byte>(Encoding.UTF8.GetBytes(content));
sendQueue.Enqueue(array);
if (!isSending)
{
await startSending(this.socket);
}
}
public void Connect(Dictionary<string, string> headers, object callback)
{
if (!this.connected)
{
this.callbacks[CID] = callback;
string data = StompCommandEnum.CONNECT.ToString() + LF;
if (headers != null)
{
foreach (string key in headers.Keys)
{
data += key + ":" + headers[key] + LF;
}
}
data += StompPrototypeVersion + LF
+ HeartBeating + LF + LF + NULL;
_send(data);
}
}
public void Send(string destination, string content)
{
this.Send(destination, new Dictionary<string, string>(), content);
}
public void Send(string destination, Dictionary<string, string> headers, string content)
{
string data = StompCommandEnum.SEND.ToString() + LF;
if (headers != null)
{
foreach (string key in headers.Keys)
{
data += key + ":" + headers[key] + LF;
}
}
data += DestinationHeader + ":" + destination + LF
+ ContentLengthHeader + ":" + GetByteCount(content) + LF + LF
+ content + NULL;
_send(data);
}
public void Send(string destination, LuaTable headers, string content)
{
string data = StompCommandEnum.SEND.ToString() + LF;
if (headers != null)
{
headers.ForEach<string, object>((key, val) =>
{
data += key + ":" + val.ToString() + LF;
});
}
data += DestinationHeader + ":" + destination + LF
+ ContentLengthHeader + ":" + GetByteCount(content) + LF + LF
+ content + NULL;
_send(data);
}
public void Subscribe(string destination, object callback)
{
this.Subscribe(destination, new Dictionary<string, string>(), callback);
}
public void Subscribe(string destination, Dictionary<string, string> headers, object callback)
{
lock (this)
{
if (!this.subscribes.ContainsKey(destination))
{
string id = "sub-" + COUNTER++;
this.callbacks.Add(id, callback);
this.subscribes.Add(destination, id);
string data = StompCommandEnum.SUBSCRIBE.ToString() + LF + "id:" + id + LF;
foreach (string key in headers.Keys)
{
data += key + ":" + headers[key] + LF;
}
data += DestinationHeader + ":" + destination + LF + LF + NULL;
_send(data);
}
}
}
public void UnSubscribe(string destination)
{
if (this.subscribes.ContainsKey(destination))
{
_send(StompCommandEnum.UNSUBSCRIBE.ToString() + LF + "id:" + this.subscribes[destination] + LF + LF + NULL);
this.callbacks.Remove(this.subscribes[destination]);
this.subscribes.Remove(destination);
}
}
public void DisConnect()
{
//ArrayList list = new ArrayList();
//list.AddRange(subscribes.Keys);
//foreach (var key in list)
//{
// UnSubscribe(key.ToString());
//}
//this.callbacks.Clear();
//this.subscribes.Clear();
_send(StompCommandEnum.DISCONNECT.ToString() + LF + LF + NULL);
this.connected = false;
isSending = false;
}
public bool IsSubscribed(string destination)
{
return this.subscribes.ContainsKey(destination);
}
private int GetByteCount(string content)
{
return Regex.Split(Uri.EscapeUriString(content), "%..|.").Length - 1;
}
private StompFrame TransformResultFrame(string content)
{
lock (this)
{
StompFrame frame = new StompFrame();
//string[] matches = Regex.Split(content, "" + NULL + LF + "*");
//foreach (var line in matches)
//{
//if (line.Length > 0)
//{
this.HandleSingleLine(content, frame);
// }
//}
return frame;
}
}
private void HandleSingleLine(string line, StompFrame frame)
{
int divider = line.IndexOf("" + LF + LF);
if (divider >= 0)
{
string[] headerLines = Regex.Split(line.Substring(0, divider), "" + LF);
frame.Code = (StatusCodeEnum)Enum.Parse(typeof(StatusCodeEnum), headerLines[0]);
for (int i = 1; i < headerLines.Length; i++)
{
int index = headerLines[i].IndexOf(":");
string key = headerLines[i].Substring(0, index);
string value = headerLines[i].Substring(index + 1);
frame.AddHeader(Regex.Replace(key, @"^\s+|\s+$", ""), Regex.Replace(value, @"^\s+|\s+$", ""));
}
frame.Content = line.Substring(divider + 2);
}
}
private void socket_Error(object sender, string errMsg)
{
if (isDebug)
{
Debug.LogWarning("socket_Error==" + errMsg);
}
//close();
//this.connected = false;
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERERROR, errMsg, null, statusCallback));
}
void socket_Closed(object sender)
{
if (isDebug)
{
Debug.LogWarning("socket_Closed");
}
this.connected = false;
if (socket != null)
{
socket.Abort();
socket.Dispose();
socket = null;
}
//this.socket.Dispose();
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.SERVERCLOSED, "与服务器断开连接", null, statusCallback));
}
private void socket_Opened(object sender)
{
queueCallback.Enqueue(new StompFrame(StatusCodeEnum.OPENSERVER, "成功连接到服务器", null, statusCallback));
}
public static int __maxLen = 1024 * 1024;
byte[] tmpBuffer = new byte[__maxLen];
StringBuilder inputSb = new StringBuilder();
void socket_MessageReceived(object sender, byte[] bytes, int len)
{
receivedBuffer.Write(bytes, 0, len);
int totalLen = (int)(receivedBuffer.Length);
receivedBuffer.Position = 0;
receivedBuffer.Read(tmpBuffer, 0, totalLen);
receivedBuffer.Position = totalLen;
string msg = Encoding.UTF8.GetString(tmpBuffer, 0, totalLen);
inputSb.Append(msg);
while (true)
{
int index = inputSb.ToString().IndexOf(NULL);
if (index < 0)
{
//  说明数据包还不完整
break;
}
string content = inputSb.ToString().Substring(0, index);
inputSb.Remove(0, index + 1);
if(isDebug)
{
Debug.LogWarning(content);
}
StompFrame frame = this.TransformResultFrame(content);
object callback;
switch (frame.Code)
{
case StatusCodeEnum.CONNECTED:
connected = true;
callbacks.TryGetValue(CID, out callback);
frame.callback = callback;
queueCallback.Enqueue(frame);
break;
case StatusCodeEnum.MESSAGE:
callbacks.TryGetValue(frame.GetHeader(SubscriptionHeader), out callback);
frame.callback = callback;
queueCallback.Enqueue(frame);
break;
case StatusCodeEnum.ERROR:
socket.Abort();
socket.Dispose();
socket = null;
Debug.LogWarning(frame.Code);
break;
default:
Debug.LogError(frame.Code);
break;
}
}
// left msg proc
msg = inputSb.ToString();
inputSb.Clear();
int leftLen = 0;
if (!string.IsNullOrEmpty(msg))
{
byte[] leftBytes = Encoding.UTF8.GetBytes(msg);
leftLen = leftBytes.Length;
}
if (leftLen > 0)
{
receivedBuffer.Read(tmpBuffer, 0, leftLen);
receivedBuffer.Position = 0;
receivedBuffer.Write(tmpBuffer, 0, leftLen);
receivedBuffer.SetLength(leftLen);
} else
{
receivedBuffer.Position = 0;
receivedBuffer.SetLength(0);
}
}
//===============================================
StompFrame _stompframe;
private void Update()
{
if (queueCallback.Count > 0)
{
_stompframe = queueCallback.Dequeue();
if (_stompframe != null)
{
Utl.doCallback(_stompframe.callback, _stompframe);
}
}
}
public void close()
{
try
{
receivedBuffer.Position = 0;
receivedBuffer.SetLength(0);
if (connected)
{
connected = false;
isSending = false;
if (isDebug)
{
Debug.Log("close===");
}
if (socket == null || socket.State == WebSocketState.Aborted
|| socket.State == WebSocketState.CloseSent
|| socket.State == WebSocketState.CloseReceived
|| socket.State == WebSocketState.Closed)
{
if (socket != null)
{
if (isDebug)
{
Debug.LogWarning("socket.Abort");
}
//socket.CloseOutputAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None).Wait();
//socket.CloseAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None).Wait();
socket.Abort();
socket.Dispose();
socket = null;
}
}
else
{
if (isDebug)
{
Debug.LogWarning("send DisConnect");
}
DisConnect();
}
queueCallback.Clear();
}
}
catch (Exception e)
{
Debug.LogError(e);
}
}
private void OnDestroy()
{
close();
}
}
}