UDP 协议说明
对于有些使用场景以上的协议都不太适合。比如大数据量传输,音频视频传输,高实时性数据传输,或者网络环境不稳定的场景下的数据传输。这时候就需要使用UDP协议了。
UDP协议类似于使用UDP模拟了一个websocket协议。首先我们定义了几个操作码,用于区分不同的操作。
操作码 | 操作 |
---|---|
0x00 | OPCODE_CONTINUATION,连续数据,数据尚未传输完成 |
0x01 | OPCODE_TEXT,文本数据 |
0x02 | OPCODE_BINARY,二进制数据 |
0x08 | OPCODE_CLOSE_CONN,关闭连接 |
0x09 | OPCODE_PING,ping |
0x0A | OPCODE_PONG,pong |
1. 连接
第一帧数据格式如下
0 | 1-x | 结束位 |
---|---|---|
0x01, 标记此帧为连接数据 | udp://[host]:[port]/[path]?token=[token], 字符串数据,用于表示自己的连接信息,其中host为机器人ip,如果使用galileo_proxy则host表示机器人的以ID开头的注册地址。port为UDP端口号,默认和机器人websocket端口号一致,3547。path是你要订阅或发送的ros话题地址。token为机器人token,向后可以扩展添加其他参数,比如galileo_proxy_token。 | 0x00 |
如果连接成功,机器人会返回一帧数据,其格式为UTF-8编码的json字符串,如下
{
"status": true,
"client_id": []
}
其中status标志连接是否成功,client_id为机器人分配的客户端id,用于标识客户端。client_id为一个长度为4的数组,每个元素为一个字节,如[0, 0, 0, 1]。
2. 发送数据
成功建立连接后就可以继续发送数据帧了,数据帧格式如下
0 | 1-4 | 5 | 6 | 7-... |
---|---|---|---|---|
0x02, 标记此帧为数据帧 | client id | 数据类型比如OPCODE_BINARY表示发送的是二进制数据 | package_index,为uint32,表示数据包的序号,从0开始,每发送一帧数据加1 | 具体数据 |
同时需要注意的是,如果数据量较大,需要分包发送,每个包的数据长度不能超过1024字节,即每个包的数据长度不能超过1024-8=1016字节。一般采用512个字节作为最大长度。
注意建立连接后机器人会不断向客户端发送ping数据,客户端需要定时发送pong数据,以保持连接。同时客户端也可以发送ping数据,机器人会返回pong数据。一般为10hz左右。
3. 接收数据
接收到的数据帧和发送的数据帧格式一致。如果是文本数据,可以直接转换为字符串,如果是二进制数据,可以直接转换为字节数组。
客户端例子
using Newtonsoft.Json;
using SuperSocket.ClientEngine;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WebSocket4Net;
namespace ChiTuClient.libs
{
internal class UdpFastClient
{
byte OPCODE_CONTINUATION = 0x0;
byte OPCODE_TEXT = 0x1;
byte OPCODE_BINARY = 0x2;
byte OPCODE_CLOSE_CONN = 0x8;
byte OPCODE_PING = 0x9;
byte OPCODE_PONG = 0xA;
int MAX_DATA_LENGTH = 512;
private Socket serverSocket;
private EndPoint remotePoint;
private EventHandler onConnectedCB = null;
private EventHandler onDisconnectedCB = null;
private EventHandler<MessageReceivedEventArgs> onMessageReceivedCB = null;
private EventHandler<DataReceivedEventArgs> onDataReceivedCB = null;
private EventHandler<ErrorEventArgs> onErrorCB = null;
private string host;
private int port;
private string path;
private string token;
private byte[] clientID = null;
private object socketLock = new object();
long lastPongTime = 0;
private UInt32 packageIndex = 0;
public UdpFastClient(string host, int port, string path, string token)
{
// 创建UDP Client
serverSocket = new Socket
(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
serverSocket.ReceiveTimeout = 5000;
remotePoint = new IPEndPoint(IPAddress.Parse(host), port);
this.path = path;
this.host = host;
this.port = port;
this.token = token;
serverSocket.Bind(new IPEndPoint(IPAddress.Parse("0.0.0.0"), 0));
}
public UdpFastClient(string hostAndPort, string path, string token)
{
this.host = hostAndPort.Split(':')[0];
this.port = int.Parse(hostAndPort.Split(':')[1]);
this.path = path;
this.token = token;
serverSocket = new Socket
(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
serverSocket.ReceiveTimeout = 5000;
IPAddress hostIP;
int serverPort = port;
try
{
hostIP = IPAddress.Parse(host);
}
catch(Exception)
{
// WAN情况的连接,port应该是服务器的port
hostIP = Dns.GetHostAddresses(host)[0];
serverPort = 10427;
}
remotePoint = new IPEndPoint(hostIP, serverPort);
serverSocket.Bind(new IPEndPoint(IPAddress.Parse("0.0.0.0"), 0));
}
public void SendPing()
{
lock (socketLock)
{
if (clientID == null)
{
return;
}
List<byte> data = new List<byte>();
data.Add(0x02);
data.AddRange(clientID);
data.Add(OPCODE_PING);
try {
serverSocket.SendTo(data.ToArray(), remotePoint);
}catch (Exception e) { }
}
}
public void SendPong()
{
lock (socketLock)
{
if (clientID == null)
return;
List<byte> data = new List<byte>();
data.Add(0x02);
data.AddRange(clientID);
data.Add(OPCODE_PONG);
try {
serverSocket.SendTo(data.ToArray(), remotePoint);
}catch(Exception e) { }
}
}
public void Send(byte[] buf, int offset, int length)
{
lock (socketLock)
{
if (clientID == null)
return;
int currentIndex = offset;
List<byte> data = new List<byte>();
while (currentIndex + MAX_DATA_LENGTH < offset + length)
{
// 数据太大需要分割数据
data.Clear();
data.Add(0x02);
data.AddRange(clientID);
data.Add(OPCODE_BINARY);
data.AddRange(BitConverter.GetBytes(packageIndex));
packageIndex += 1;
data.AddRange(buf.Skip(currentIndex).Take(MAX_DATA_LENGTH));
try {
serverSocket.SendTo(data.ToArray(), remotePoint);
}catch(Exception e) { }
currentIndex = currentIndex + MAX_DATA_LENGTH;
}
data.Clear();
data.Add(0x02);
data.AddRange(clientID);
data.Add(OPCODE_BINARY);
data.AddRange(BitConverter.GetBytes(packageIndex));
packageIndex += 1;
data.AddRange(buf.Skip(currentIndex).Take(offset + length - currentIndex));
try {
serverSocket.SendTo(data.ToArray(), remotePoint);
}catch(Exception e) { }
}
}
public void Send(string msg)
{
lock (socketLock)
{
if (clientID == null)
return;
List<byte> data = new List<byte>();
data.Add(0x02);
data.AddRange(clientID);
data.Add(OPCODE_TEXT);
data.AddRange(Encoding.UTF8.GetBytes(msg));
if (data.Count > MAX_DATA_LENGTH)
MapUtils.Log("data too big");
try {
serverSocket.SendTo(data.ToArray(), remotePoint);
}catch(Exception e) { }
}
}
public void Start()
{
try
{
lock(socketLock)
{
// 发送第一帧数据
List<byte> data = new List<byte>();
data.Add(0x01);
data.AddRange(Encoding.UTF8.GetBytes($"udp://{host}:{port}/{path}?token={token}"));
data.Add((byte)'\0');
serverSocket.SendTo(data.ToArray(), remotePoint);
EndPoint RemoteIpEndPoint = new IPEndPoint(IPAddress.Any, 0);
byte[] receiveBytes = new byte[1024 * 2];
int length = serverSocket.ReceiveFrom(receiveBytes, ref RemoteIpEndPoint);
string receivedMsg = Encoding.UTF8.GetString(receiveBytes, 6, length);
UDPClientResponse res = JsonConvert.DeserializeObject<UDPClientResponse>(receivedMsg);
if (res.status)
{
clientID = res.client_id;
}
else
{
clientID = null;
}
}
if(clientID == null && onDisconnectedCB != null)
onDisconnectedCB(this, null);
}
catch (Exception ignore)
{
lock(socketLock)
{
clientID = null;
}
if (onDisconnectedCB != null)
onDisconnectedCB(this, null);
return;
}
if(onConnectedCB != null)
{
onConnectedCB(this, null);
}
// 开启读取线程
Task.Run(() => {
while (true)
{
lock(socketLock)
{
if (clientID == null)
return;
}
try
{
EndPoint RemoteIpEndPoint = new IPEndPoint(IPAddress.Any, 0);
byte[] receiveBytes = new byte[1024 * 2];
int length = serverSocket.ReceiveFrom(receiveBytes, ref RemoteIpEndPoint);
// 检测前几个字节是否正确
if (receiveBytes[0] != 2)
continue;
bool idCheckFlag = true;
lock(socketLock)
{
for (int i = 0; i < 4; i++)
{
if (clientID == null || receiveBytes[i + 1] != clientID[i])
{
idCheckFlag = false;
break;
}
}
}
if (!idCheckFlag)
continue;
// 任意类型的消息都可以更新pong时间
lastPongTime = GetTimeStamp();
// 处理各种数据类型
if (receiveBytes[5] == OPCODE_TEXT && onMessageReceivedCB != null)
{
string message = Encoding.UTF8.GetString(
receiveBytes, 6, length);
MessageReceivedEventArgs e = new MessageReceivedEventArgs(message);
onMessageReceivedCB(this, e);
}
if (receiveBytes[5] == OPCODE_BINARY && onDataReceivedCB != null)
{
DataReceivedEventArgs e = new DataReceivedEventArgs(
receiveBytes.Skip(6).Take(receiveBytes.Length - 6).ToArray());
onDataReceivedCB(this, e);
}
if (receiveBytes[5] == OPCODE_PING)
{
SendPong();
continue;
}
if (receiveBytes[5] == OPCODE_PONG || receiveBytes[5] == OPCODE_CONTINUATION)
{
continue;
}
if (receiveBytes[5] == OPCODE_CLOSE_CONN)
{
lock(socketLock)
{
clientID = null;
}
if (onDisconnectedCB != null)
onDisconnectedCB(this, null);
serverSocket.Close();
continue;
}
}
catch (Exception ignore)
{
}
}
});
Task.Run(() => {
// 定时发送ping
while (true)
{
lock(socketLock)
{
if (clientID == null)
return;
}
SendPing();
Thread.Sleep(100);
if (lastPongTime == 0)
lastPongTime = GetTimeStamp();
if (GetTimeStamp() - lastPongTime > 10 * 1000)
{
// 对面服务器可能已经下线
Stop();
}
}
});
}
public void Stop()
{
lock (socketLock)
{
if (clientID == null)
return;
try
{
List<byte> data = new List<byte>();
data.Add(0x02);
data.AddRange(clientID);
data.Add(0x08);
serverSocket.SendTo(data.ToArray(), remotePoint);
// 服务端将不会返回信息
clientID = null;
}
catch (Exception ignore)
{
return;
}
}
if (onDisconnectedCB != null)
onDisconnectedCB(this, null);
return;
}
public void OnOpen(EventHandler cb)
{
onConnectedCB = cb;
}
public void OnClose(EventHandler cb)
{
onDisconnectedCB = cb;
}
public void OnMessageReceived(EventHandler<MessageReceivedEventArgs> cb)
{
onMessageReceivedCB = cb;
}
public void OnError(EventHandler<ErrorEventArgs> cb)
{
onErrorCB = cb;
}
public void OnDataReceived(EventHandler<DataReceivedEventArgs> cb)
{
onDataReceivedCB = cb;
}
public bool IsConnected()
{
lock(socketLock)
{
return clientID != null;
}
}
public static long GetTimeStamp()
{
var timeSpan = (DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0));
return (long)timeSpan.TotalMilliseconds;
}
internal class UDPClientResponse
{
public bool status = false;
public byte[] client_id;
}
}
}
// 创建局域网连接
var client = new UdpFastClient("192.168.0.132:3547", "cmd_vel", token);
// 创建云端连接
var client = new UdpFastClient("AAAAAAAA.3547.robot1.bwbot.org:3547", "cmd_vel", token);
// 收到txt消息
client.OnMessageReceived(new EventHandler<MessageReceivedEventArgs>(GalileoMessageReceived));
// 收到二进制消息
client.OnDataReceived(new EventHandler<DataReceivedEventArgs>(GalileoDataReceived));
client.OnError(new EventHandler<ErrorEventArgs>(GalileoWebsocketOnError));
client.OnClose(new EventHandler(GalileoWebsocketOnClose));
client.OnOpen(new EventHandler(GalileoWebsocketOnOpen));
client.Start();
// 发送速度指令
TwistMsg msg = new TwistMsg(); // 根据TwistMsg定义的格式,构造消息
msg.linear.x = linear;
client.Send(JsonConvert.SerializeObject(msg));
// 此代码是在react native环境下运行的,需要安装react-native-udp
import UdpSocket from 'react-native-udp/lib/types/UdpSocket';
import dgram from '../components/native/UdpDgram';
import 'text-encoding-polyfill';
const OPCODE_CONTINUATION = 0x0;
const OPCODE_TEXT = 0x1;
const OPCODE_BINARY = 0x2;
const OPCODE_CLOSE_CONN = 0x8;
const OPCODE_PING = 0x9;
const OPCODE_PONG = 0xA;
const MAX_DATA_LENGTH = 512;
function Uint32ToUint8Array(value: number) {
return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF];
}
type UDPClientResponse = {
status: boolean,
client_id: Uint8Array
};
// 是否是有效的的ipv4地址
function isIPv4(ip: string) {
const reg = /^((25[0-5]|2[0-4]\d|[01]?\d\d?)($|(?!\.$)\.)){4}$/;
return reg.test(ip);
}
class UdpFastClient {
socket: UdpSocket;
host: string;
port: number;
path: string;
token: string;
clientID: Uint8Array | null;
onConnectedCB: (() => void) | null;
onDisconnectedCB: (() => void) | null;
onMessageReceivedCB: ((data: string) => void) | null;
onDataReceivedCB: ((data: Uint8Array) => void) | null;
onErrorCB: (() => void) | null;
lastPongTime: number;
packageIndex: number;
constructor(hostAndPort: string, path: string, token: string) {
this.host = hostAndPort.split(':')[0];
this.port = parseInt(hostAndPort.split(':')[1]);
// host 是否是有效ip地址
if (!isIPv4(this.host)) {
this.port = 10427;
}
this.socket = dgram.createSocket({ type: 'udp4', reusePort: true });
this.path = path
this.token = token;
this.clientID = null;
this.onConnectedCB = null;
this.onDisconnectedCB = null;
this.onMessageReceivedCB = null;
this.onDataReceivedCB = null;
this.onErrorCB = null;
this.lastPongTime = 0;
this.packageIndex = 0;
}
sendPing() {
if (this.clientID == null) {
return;
}
const data = [0x02, ...this.clientID, OPCODE_PING];
this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
if (typeof e != 'undefined') {
console.log("send ping error");
console.log(e);
}
});
}
sendPong() {
if (this.clientID == null) {
return;
}
const data = [0x02, ...this.clientID, OPCODE_PONG];
this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
if (typeof e != 'undefined') {
console.log("send pong error");
console.log(e);
}
});
}
sendBuf(buf: Uint8Array, offset: number, length: number) {
if (this.clientID == null) {
return;
}
let currentIndex = offset;
let data = []
while (currentIndex + MAX_DATA_LENGTH < offset + length) {
data = [0x02, ...this.clientID, OPCODE_BINARY, ...Uint32ToUint8Array(this.packageIndex)];
this.packageIndex += 1;
data = [...data, ...buf.slice(currentIndex, currentIndex + MAX_DATA_LENGTH)];
this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
if (typeof e != 'undefined') {
console.log("send buf error 1");
console.log(e);
}
});
currentIndex += MAX_DATA_LENGTH;
}
data = [0x02, ...this.clientID, OPCODE_BINARY, ...Uint32ToUint8Array(this.packageIndex)];
this.packageIndex += 1;
data = [...data, ...buf.slice(currentIndex, offset + length)];
this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
if (typeof e != 'undefined') {
console.log("send buf error 2");
console.log(e);
}
});
}
sendMsg(msg: string) {
if (this.clientID == null) {
return;
}
const data = [0x02, ...this.clientID, OPCODE_TEXT, ...(new TextEncoder().encode(msg))];
if (data.length > MAX_DATA_LENGTH) {
console.log("data too large");
}
this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
if (typeof e != 'undefined') {
console.log("send msg error");
console.log(e);
}
});
}
async start() {
await this.waitBind(0);
let data = [0x01, ...(new TextEncoder().encode(`udp://${this.host}:${this.port}/${this.path}?token=${this.token}`)),
'\0'.charCodeAt(0)];
this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
if (typeof e != 'undefined') {
console.log("send init error");
console.log(e);
}
});
try {
let receiveBytes = await this.receive();
// bytes to string
let receivedMsg = new TextDecoder().decode(receiveBytes.slice(6));
let res: UDPClientResponse = JSON.parse(receivedMsg);
if (res.status) {
this.clientID = res.client_id;
} else {
this.clientID = null;
}
} catch (e) {
this.clientID = null;
}
if (this.clientID == null && this.onDisconnectedCB != null) {
this.onDisconnectedCB();
return;
}
if (this.onConnectedCB != null) {
this.onConnectedCB();
}
this.socket.on('message', (receiveBytes: Uint8Array,
rinfo: { address: string, port: number }) => {
if (this.clientID == null) {
// 关闭socket
this.socket.close();
return;
}
if (receiveBytes.length < 2) {
return;
}
if (receiveBytes[0] != 0x02) {
return;
}
let idCheckFlag = true;
// 检测clientID
for (let i = 0; i < 4; i++) {
if (receiveBytes[i + 1] != this.clientID[i]) {
idCheckFlag = false;
break;
}
}
if (!idCheckFlag) {
return;
}
this.lastPongTime = new Date().valueOf();
if (receiveBytes[5] == OPCODE_TEXT && this.onMessageReceivedCB != null) {
const msg = new TextDecoder().decode(receiveBytes.slice(6));
this.onMessageReceivedCB(msg);
}
if (receiveBytes[5] == OPCODE_BINARY && this.onDataReceivedCB != null) {
this.onDataReceivedCB(receiveBytes.slice(6));
}
if (receiveBytes[5] == OPCODE_PING) {
this.sendPong();
}
if (receiveBytes[5] == OPCODE_PONG || receiveBytes[5] == OPCODE_CONTINUATION) {
return;
}
if (receiveBytes[5] == OPCODE_CLOSE_CONN) {
this.clientID = null;
if (this.onDisconnectedCB != null) {
this.onDisconnectedCB();
}
this.socket.close();
}
});
// 定时发送ping
const pingTimer = setInterval(() => {
if (this.clientID == null) {
clearInterval(pingTimer);
return;
}
this.sendPing();
if (this.lastPongTime == 0)
this.lastPongTime = new Date().valueOf();
if (new Date().valueOf() - this.lastPongTime > 10000) {
this.stop();
}
}, 100);
}
async receive() {
return new Promise<Uint8Array>((resolve, reject) => {
let resoleved = false;
const messageCB = (data: any, rinfo: { address: string, port: number }) => {
this.socket.off('message', messageCB);
resolve(data);
resoleved = true;
}
this.socket.on('message', messageCB);
setTimeout(() => {
if (!resoleved) {
this.socket.off('message', messageCB);
reject();
}
}, 5000);
});
}
async waitBind(port: number) {
return new Promise((resolve, reject) => {
// 等待socket绑定
let binded = false;
const bindCB = () => {
this.socket.off('listening', bindCB);
binded = true;
resolve(true);
}
this.socket.on('listening', bindCB);
this.socket.bind(port)
setTimeout(() => {
if (!binded) {
this.socket.off('listening', bindCB);
reject();
}
}, 5000);
})
}
stop() {
if (this.clientID == null) {
return;
}
const data = [0x02, ...this.clientID, OPCODE_CLOSE_CONN];
this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
if (typeof e != 'undefined') {
console.log("send close error");
console.log(e);
}
});
this.clientID = null;
if (this.onDisconnectedCB != null) {
this.onDisconnectedCB();
}
this.socket.close();
}
onOpen(cb: () => void) {
this.onConnectedCB = cb;
}
onClose(cb: () => void) {
this.onDisconnectedCB = cb;
}
onMessageReceived(cb: (msg: string) => void) {
this.onMessageReceivedCB = cb;
}
onDataReceived(cb: (data: Uint8Array) => void) {
this.onDataReceivedCB = cb;
}
onError(cb: () => void) {
this.onErrorCB = cb;
}
isConnected() {
return this.clientID != null;
}
}
export default UdpFastClient;
// 创建局域网连接
let udpclient = new UdpFastClient("192.168.0.132:3547", "cmd_vel", token);
// 创建公网连接
let udpclient = new UdpFastClient("aaaaaaa.3547.robot1.bwbot.org:3547", "cmd_vel", token);
udpclient.onOpen(() => {
options.onConnect && options.onConnect();
});
udpclient.onMessageReceived((data) => {
options.onMessage && options.onMessage(data);
});
udpclient.onDataReceived((data) => {
options.onData && options.onData(data);
});
udpclient.onError(() => {
options.onError && options.onError();
});
udpclient.onClose(() => {
options.onClose && options.onClose();
});
udpclient.start();