UDP 协议说明

对于有些使用场景以上的协议都不太适合。比如大数据量传输,音频视频传输,高实时性数据传输,或者网络环境不稳定的场景下的数据传输。这时候就需要使用UDP协议了。

UDP协议类似于使用UDP模拟了一个websocket协议。首先我们定义了几个操作码,用于区分不同的操作。

操作码 操作
0x00 OPCODE_CONTINUATION,连续数据,数据尚未传输完成
0x01 OPCODE_TEXT,文本数据
0x02 OPCODE_BINARY,二进制数据
0x08 OPCODE_CLOSE_CONN,关闭连接
0x09 OPCODE_PING,ping
0x0A OPCODE_PONG,pong

1. 连接

第一帧数据格式如下

0 1-x 结束位
0x01, 标记此帧为连接数据 udp://[host]:[port]/[path]?token=[token], 字符串数据,用于表示自己的连接信息,其中host为机器人ip,如果使用galileo_proxy则host表示机器人的以ID开头的注册地址。port为UDP端口号,默认和机器人websocket端口号一致,3547。path是你要订阅或发送的ros话题地址。token为机器人token,向后可以扩展添加其他参数,比如galileo_proxy_token。 0x00

如果连接成功,机器人会返回一帧数据,其格式为UTF-8编码的json字符串,如下

{
    "status": true,
    "client_id": []
}

其中status标志连接是否成功,client_id为机器人分配的客户端id,用于标识客户端。client_id为一个长度为4的数组,每个元素为一个字节,如[0, 0, 0, 1]。

2. 发送数据

成功建立连接后就可以继续发送数据帧了,数据帧格式如下

0 1-4 5 6 7-...
0x02, 标记此帧为数据帧 client id 数据类型比如OPCODE_BINARY表示发送的是二进制数据 package_index,为uint32,表示数据包的序号,从0开始,每发送一帧数据加1 具体数据

同时需要注意的是,如果数据量较大,需要分包发送,每个包的数据长度不能超过1024字节,即每个包的数据长度不能超过1024-8=1016字节。一般采用512个字节作为最大长度。

注意建立连接后机器人会不断向客户端发送ping数据,客户端需要定时发送pong数据,以保持连接。同时客户端也可以发送ping数据,机器人会返回pong数据。一般为10hz左右。

3. 接收数据

接收到的数据帧和发送的数据帧格式一致。如果是文本数据,可以直接转换为字符串,如果是二进制数据,可以直接转换为字节数组。

客户端例子

using Newtonsoft.Json;
using SuperSocket.ClientEngine;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WebSocket4Net;

namespace ChiTuClient.libs
{

    internal class UdpFastClient
    {

        byte OPCODE_CONTINUATION = 0x0;
        byte OPCODE_TEXT = 0x1;
        byte OPCODE_BINARY = 0x2;
        byte OPCODE_CLOSE_CONN = 0x8;
        byte OPCODE_PING = 0x9;
        byte OPCODE_PONG = 0xA;
        int MAX_DATA_LENGTH = 512;


        private Socket serverSocket;
        private EndPoint remotePoint;
        private EventHandler onConnectedCB = null;
        private EventHandler onDisconnectedCB = null;
        private EventHandler<MessageReceivedEventArgs> onMessageReceivedCB = null;
        private EventHandler<DataReceivedEventArgs> onDataReceivedCB = null;
        private EventHandler<ErrorEventArgs> onErrorCB = null;
        private string host;
        private int port;
        private string path;
        private string token;
        private byte[] clientID = null;
        private object socketLock = new object();
        long lastPongTime = 0;
        private UInt32 packageIndex = 0;
        public UdpFastClient(string host, int port, string path, string token)
        {
            // 创建UDP Client
            serverSocket = new Socket
                (AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
            serverSocket.ReceiveTimeout = 5000;
            remotePoint = new IPEndPoint(IPAddress.Parse(host), port);
            this.path = path;
            this.host = host;
            this.port = port;
            this.token = token;
            serverSocket.Bind(new IPEndPoint(IPAddress.Parse("0.0.0.0"), 0));
        }

        public UdpFastClient(string hostAndPort, string path, string token)
        {
            this.host = hostAndPort.Split(':')[0];
            this.port = int.Parse(hostAndPort.Split(':')[1]);
            this.path = path;
            this.token = token;
            serverSocket = new Socket
                (AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
            serverSocket.ReceiveTimeout = 5000;
            IPAddress hostIP;
            int serverPort = port;
            try
            {
                hostIP = IPAddress.Parse(host);
            }
            catch(Exception)
            {
                // WAN情况的连接,port应该是服务器的port
                hostIP = Dns.GetHostAddresses(host)[0];
                serverPort = 10427;
            }
            remotePoint = new IPEndPoint(hostIP, serverPort);
            serverSocket.Bind(new IPEndPoint(IPAddress.Parse("0.0.0.0"), 0));
        }

        public void SendPing()
        {
            lock (socketLock)
            {
                if (clientID == null)
                {
                    return;
                }
                List<byte> data = new List<byte>();
                data.Add(0x02);
                data.AddRange(clientID);
                data.Add(OPCODE_PING);
                try {
                    serverSocket.SendTo(data.ToArray(), remotePoint);
                }catch (Exception e) { }

            }
        }

        public void SendPong()
        {
            lock (socketLock)
            {
                if (clientID == null)
                    return;
                List<byte> data = new List<byte>();
                data.Add(0x02);
                data.AddRange(clientID);
                data.Add(OPCODE_PONG);
                try {
                    serverSocket.SendTo(data.ToArray(), remotePoint);
                }catch(Exception e) { }
            }
        }

        public void Send(byte[] buf, int offset, int length)
        {
            lock (socketLock)
            {
                if (clientID == null)
                    return;
                int currentIndex = offset;
                List<byte> data = new List<byte>();
                while (currentIndex + MAX_DATA_LENGTH < offset + length)
                {
                    // 数据太大需要分割数据
                    data.Clear();
                    data.Add(0x02);
                    data.AddRange(clientID);
                    data.Add(OPCODE_BINARY);
                    data.AddRange(BitConverter.GetBytes(packageIndex));
                    packageIndex += 1;
                    data.AddRange(buf.Skip(currentIndex).Take(MAX_DATA_LENGTH));
                    try {
                        serverSocket.SendTo(data.ToArray(), remotePoint);
                    }catch(Exception e) { }
                    currentIndex = currentIndex + MAX_DATA_LENGTH;
                }
                data.Clear();
                data.Add(0x02);
                data.AddRange(clientID);
                data.Add(OPCODE_BINARY);
                data.AddRange(BitConverter.GetBytes(packageIndex));
                packageIndex += 1;
                data.AddRange(buf.Skip(currentIndex).Take(offset + length - currentIndex));
                try {
                    serverSocket.SendTo(data.ToArray(), remotePoint);
                }catch(Exception e) { }

            }
        }

        public void Send(string msg)
        {
            lock (socketLock)
            {
                if (clientID == null)
                    return;
                List<byte> data = new List<byte>();
                data.Add(0x02);
                data.AddRange(clientID);
                data.Add(OPCODE_TEXT);
                data.AddRange(Encoding.UTF8.GetBytes(msg));
                if (data.Count > MAX_DATA_LENGTH)
                    MapUtils.Log("data too big");
                try {
                    serverSocket.SendTo(data.ToArray(), remotePoint);
                }catch(Exception e) { }
            }
        }

        public void Start()
        {

            try
            {
                lock(socketLock)
                {
                    // 发送第一帧数据
                    List<byte> data = new List<byte>();
                    data.Add(0x01);
                    data.AddRange(Encoding.UTF8.GetBytes($"udp://{host}:{port}/{path}?token={token}"));
                    data.Add((byte)'\0');
                    serverSocket.SendTo(data.ToArray(), remotePoint);
                    EndPoint RemoteIpEndPoint = new IPEndPoint(IPAddress.Any, 0);
                    byte[] receiveBytes = new byte[1024 * 2];
                    int length = serverSocket.ReceiveFrom(receiveBytes, ref RemoteIpEndPoint);
                    string receivedMsg = Encoding.UTF8.GetString(receiveBytes, 6, length);
                    UDPClientResponse res = JsonConvert.DeserializeObject<UDPClientResponse>(receivedMsg);
                    if (res.status)
                    {
                        clientID = res.client_id;
                    }
                    else
                    {
                        clientID = null;
                    }
                }
                if(clientID == null && onDisconnectedCB != null)
                    onDisconnectedCB(this, null);
            }
            catch (Exception ignore)
            {
                lock(socketLock)
                {
                    clientID = null;
                }
                if (onDisconnectedCB != null)
                    onDisconnectedCB(this, null);
                return;
            }
            if(onConnectedCB != null)
            {
                onConnectedCB(this, null);
            }
            // 开启读取线程
            Task.Run(() => {
                while (true)
                {
                    lock(socketLock)
                    {
                        if (clientID == null)
                            return;
                    }
                    try
                    {
                        EndPoint RemoteIpEndPoint = new IPEndPoint(IPAddress.Any, 0);
                        byte[] receiveBytes = new byte[1024 * 2];
                        int length = serverSocket.ReceiveFrom(receiveBytes, ref RemoteIpEndPoint);
                        // 检测前几个字节是否正确
                        if (receiveBytes[0] != 2)
                            continue;
                        bool idCheckFlag = true;
                        lock(socketLock)
                        {
                            for (int i = 0; i < 4; i++)
                            {
                                if (clientID == null || receiveBytes[i + 1] != clientID[i])
                                {
                                    idCheckFlag = false;
                                    break;
                                }
                            }
                        }
                        if (!idCheckFlag)
                            continue;
                        // 任意类型的消息都可以更新pong时间
                        lastPongTime = GetTimeStamp();
                        // 处理各种数据类型
                        if (receiveBytes[5] == OPCODE_TEXT && onMessageReceivedCB != null)
                        {
                            string message = Encoding.UTF8.GetString(
                                receiveBytes, 6, length);
                            MessageReceivedEventArgs e = new MessageReceivedEventArgs(message);
                            onMessageReceivedCB(this, e);
                        }
                        if (receiveBytes[5] == OPCODE_BINARY && onDataReceivedCB != null)
                        {
                            DataReceivedEventArgs e = new DataReceivedEventArgs(
                                receiveBytes.Skip(6).Take(receiveBytes.Length - 6).ToArray());
                            onDataReceivedCB(this, e);
                        }
                        if (receiveBytes[5] == OPCODE_PING)
                        {
                            SendPong();
                            continue;
                        }
                        if (receiveBytes[5] == OPCODE_PONG || receiveBytes[5] == OPCODE_CONTINUATION)
                        {
                            continue;
                        }
                        if (receiveBytes[5] == OPCODE_CLOSE_CONN)
                        {
                            lock(socketLock)
                            {
                                clientID = null;
                            }
                            if (onDisconnectedCB != null)
                                onDisconnectedCB(this, null);
                            serverSocket.Close();
                            continue;
                        }
                    }
                    catch (Exception ignore)
                    {

                    }
                }
            });
            Task.Run(() => {
                // 定时发送ping
                while (true)
                {
                    lock(socketLock)
                    {
                        if (clientID == null)
                            return;
                    }
                    SendPing();
                    Thread.Sleep(100);
                    if (lastPongTime == 0)
                        lastPongTime = GetTimeStamp();
                    if (GetTimeStamp() - lastPongTime > 10 * 1000)
                    {
                        // 对面服务器可能已经下线
                        Stop();
                    }
                }
            });
        }

        public void Stop()
        {
            lock (socketLock)
            {
                if (clientID == null)
                    return;
                try
                {
                    List<byte> data = new List<byte>();
                    data.Add(0x02);
                    data.AddRange(clientID);
                    data.Add(0x08);
                    serverSocket.SendTo(data.ToArray(), remotePoint);
                    // 服务端将不会返回信息
                    clientID = null;
                }
                catch (Exception ignore)
                {
                    return;
                }
            }
            if (onDisconnectedCB != null)
                onDisconnectedCB(this, null);
            return;
        }

        public void OnOpen(EventHandler cb)
        {
            onConnectedCB = cb;
        }

        public void OnClose(EventHandler cb)
        {
            onDisconnectedCB = cb;
        }

        public void OnMessageReceived(EventHandler<MessageReceivedEventArgs> cb)
        {
            onMessageReceivedCB = cb;
        }

        public void OnError(EventHandler<ErrorEventArgs> cb)
        {
            onErrorCB = cb;
        }

        public void OnDataReceived(EventHandler<DataReceivedEventArgs> cb)
        {
            onDataReceivedCB = cb;
        }

        public bool IsConnected()
        {
            lock(socketLock)
            {
                return clientID != null;
            }
        }

        public static long GetTimeStamp()
        {
            var timeSpan = (DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0));
            return (long)timeSpan.TotalMilliseconds;
        }

        internal class UDPClientResponse
        {
            public bool status = false;
            public byte[] client_id;
        }
    }
}

// 创建局域网连接
var client = new UdpFastClient("192.168.0.132:3547", "cmd_vel", token);
// 创建云端连接
var client = new UdpFastClient("AAAAAAAA.3547.robot1.bwbot.org:3547", "cmd_vel", token);
// 收到txt消息
client.OnMessageReceived(new EventHandler<MessageReceivedEventArgs>(GalileoMessageReceived));
// 收到二进制消息
client.OnDataReceived(new EventHandler<DataReceivedEventArgs>(GalileoDataReceived));
client.OnError(new EventHandler<ErrorEventArgs>(GalileoWebsocketOnError));
client.OnClose(new EventHandler(GalileoWebsocketOnClose));
client.OnOpen(new EventHandler(GalileoWebsocketOnOpen));
client.Start();


// 发送速度指令
TwistMsg msg = new TwistMsg(); // 根据TwistMsg定义的格式,构造消息
msg.linear.x = linear;
client.Send(JsonConvert.SerializeObject(msg));
// 此代码是在react native环境下运行的,需要安装react-native-udp
import UdpSocket from 'react-native-udp/lib/types/UdpSocket';
import dgram from '../components/native/UdpDgram';
import 'text-encoding-polyfill';
const OPCODE_CONTINUATION = 0x0;
const OPCODE_TEXT = 0x1;
const OPCODE_BINARY = 0x2;
const OPCODE_CLOSE_CONN = 0x8;
const OPCODE_PING = 0x9;
const OPCODE_PONG = 0xA;
const MAX_DATA_LENGTH = 512;

function Uint32ToUint8Array(value: number) {
    return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF];
}

type UDPClientResponse = {
    status: boolean,
    client_id: Uint8Array
};


// 是否是有效的的ipv4地址
function isIPv4(ip: string) {
    const reg = /^((25[0-5]|2[0-4]\d|[01]?\d\d?)($|(?!\.$)\.)){4}$/;
    return reg.test(ip);
}


class UdpFastClient {
    socket: UdpSocket;
    host: string;
    port: number;
    path: string;
    token: string;
    clientID: Uint8Array | null;
    onConnectedCB: (() => void) | null;
    onDisconnectedCB: (() => void) | null;
    onMessageReceivedCB: ((data: string) => void) | null;
    onDataReceivedCB: ((data: Uint8Array) => void) | null;
    onErrorCB: (() => void) | null;
    lastPongTime: number;
    packageIndex: number;
    constructor(hostAndPort: string, path: string, token: string) {
        this.host = hostAndPort.split(':')[0];
        this.port = parseInt(hostAndPort.split(':')[1]);
        // host 是否是有效ip地址
        if (!isIPv4(this.host)) {
            this.port = 10427;
        }
        this.socket = dgram.createSocket({ type: 'udp4', reusePort: true });
        this.path = path
        this.token = token;
        this.clientID = null;
        this.onConnectedCB = null;
        this.onDisconnectedCB = null;
        this.onMessageReceivedCB = null;
        this.onDataReceivedCB = null;
        this.onErrorCB = null;
        this.lastPongTime = 0;
        this.packageIndex = 0;
    }



    sendPing() {
        if (this.clientID == null) {
            return;
        }
        const data = [0x02, ...this.clientID, OPCODE_PING];
        this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
            if (typeof e != 'undefined') {
                console.log("send ping error");
                console.log(e);
            }
        });
    }

    sendPong() {
        if (this.clientID == null) {
            return;
        }
        const data = [0x02, ...this.clientID, OPCODE_PONG];
        this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
            if (typeof e != 'undefined') {
                console.log("send pong error");
                console.log(e);
            }
        });
    }

    sendBuf(buf: Uint8Array, offset: number, length: number) {
        if (this.clientID == null) {
            return;
        }
        let currentIndex = offset;
        let data = []
        while (currentIndex + MAX_DATA_LENGTH < offset + length) {
            data = [0x02, ...this.clientID, OPCODE_BINARY, ...Uint32ToUint8Array(this.packageIndex)];
            this.packageIndex += 1;
            data = [...data, ...buf.slice(currentIndex, currentIndex + MAX_DATA_LENGTH)];
            this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
                if (typeof e != 'undefined') {
                    console.log("send buf error 1");
                    console.log(e);
                }
            });
            currentIndex += MAX_DATA_LENGTH;
        }
        data = [0x02, ...this.clientID, OPCODE_BINARY, ...Uint32ToUint8Array(this.packageIndex)];
        this.packageIndex += 1;
        data = [...data, ...buf.slice(currentIndex, offset + length)];
        this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
            if (typeof e != 'undefined') {
                console.log("send buf error 2");
                console.log(e);
            }
        });
    }

    sendMsg(msg: string) {
        if (this.clientID == null) {
            return;
        }
        const data = [0x02, ...this.clientID, OPCODE_TEXT, ...(new TextEncoder().encode(msg))];
        if (data.length > MAX_DATA_LENGTH) {
            console.log("data too large");
        }
        this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
            if (typeof e != 'undefined') {
                console.log("send msg error");
                console.log(e);
            }
        });
    }

    async start() {
        await this.waitBind(0);
        let data = [0x01, ...(new TextEncoder().encode(`udp://${this.host}:${this.port}/${this.path}?token=${this.token}`)),
            '\0'.charCodeAt(0)];
        this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
            if (typeof e != 'undefined') {
                console.log("send init error");
                console.log(e);
            }
        });
        try {
            let receiveBytes = await this.receive();
            // bytes to string
            let receivedMsg = new TextDecoder().decode(receiveBytes.slice(6));
            let res: UDPClientResponse = JSON.parse(receivedMsg);
            if (res.status) {
                this.clientID = res.client_id;
            } else {
                this.clientID = null;
            }
        } catch (e) {
            this.clientID = null;
        }
        if (this.clientID == null && this.onDisconnectedCB != null) {
            this.onDisconnectedCB();
            return;
        }
        if (this.onConnectedCB != null) {
            this.onConnectedCB();
        }
        this.socket.on('message', (receiveBytes: Uint8Array,
            rinfo: { address: string, port: number }) => {
            if (this.clientID == null) {
                // 关闭socket
                this.socket.close();
                return;
            }
            if (receiveBytes.length < 2) {
                return;
            }
            if (receiveBytes[0] != 0x02) {
                return;
            }
            let idCheckFlag = true;
            // 检测clientID
            for (let i = 0; i < 4; i++) {
                if (receiveBytes[i + 1] != this.clientID[i]) {
                    idCheckFlag = false;
                    break;
                }
            }
            if (!idCheckFlag) {
                return;
            }
            this.lastPongTime = new Date().valueOf();
            if (receiveBytes[5] == OPCODE_TEXT && this.onMessageReceivedCB != null) {
                const msg = new TextDecoder().decode(receiveBytes.slice(6));
                this.onMessageReceivedCB(msg);
            }
            if (receiveBytes[5] == OPCODE_BINARY && this.onDataReceivedCB != null) {
                this.onDataReceivedCB(receiveBytes.slice(6));
            }
            if (receiveBytes[5] == OPCODE_PING) {
                this.sendPong();
            }
            if (receiveBytes[5] == OPCODE_PONG || receiveBytes[5] == OPCODE_CONTINUATION) {
                return;
            }
            if (receiveBytes[5] == OPCODE_CLOSE_CONN) {
                this.clientID = null;
                if (this.onDisconnectedCB != null) {
                    this.onDisconnectedCB();
                }
                this.socket.close();
            }

        });

        // 定时发送ping
        const pingTimer = setInterval(() => {
            if (this.clientID == null) {
                clearInterval(pingTimer);
                return;
            }
            this.sendPing();
            if (this.lastPongTime == 0)
                this.lastPongTime = new Date().valueOf();
            if (new Date().valueOf() - this.lastPongTime > 10000) {
                this.stop();
            }
        }, 100);

    }

    async receive() {
        return new Promise<Uint8Array>((resolve, reject) => {
            let resoleved = false;
            const messageCB = (data: any, rinfo: { address: string, port: number }) => {
                this.socket.off('message', messageCB);
                resolve(data);
                resoleved = true;

            }
            this.socket.on('message', messageCB);
            setTimeout(() => {
                if (!resoleved) {
                    this.socket.off('message', messageCB);
                    reject();

                }
            }, 5000);
        });
    }

    async waitBind(port: number) {
        return new Promise((resolve, reject) => {
            // 等待socket绑定
            let binded = false;
            const bindCB = () => {
                this.socket.off('listening', bindCB);
                binded = true;
                resolve(true);
            }
            this.socket.on('listening', bindCB);
            this.socket.bind(port)
            setTimeout(() => {
                if (!binded) {
                    this.socket.off('listening', bindCB);
                    reject();
                }
            }, 5000);
        })
    }

    stop() {
        if (this.clientID == null) {
            return;
        }
        const data = [0x02, ...this.clientID, OPCODE_CLOSE_CONN];
        this.socket.send(new Uint8Array(data), 0, data.length, this.port, this.host, (e) => {
            if (typeof e != 'undefined') {
                console.log("send close error");
                console.log(e);
            }
        });
        this.clientID = null;
        if (this.onDisconnectedCB != null) {
            this.onDisconnectedCB();
        }
        this.socket.close();
    }

    onOpen(cb: () => void) {
        this.onConnectedCB = cb;
    }

    onClose(cb: () => void) {
        this.onDisconnectedCB = cb;
    }

    onMessageReceived(cb: (msg: string) => void) {
        this.onMessageReceivedCB = cb;
    }

    onDataReceived(cb: (data: Uint8Array) => void) {
        this.onDataReceivedCB = cb;
    }

    onError(cb: () => void) {
        this.onErrorCB = cb;
    }

    isConnected() {
        return this.clientID != null;
    }

}

export default UdpFastClient;

// 创建局域网连接
let udpclient = new UdpFastClient("192.168.0.132:3547", "cmd_vel", token);
// 创建公网连接
let udpclient = new UdpFastClient("aaaaaaa.3547.robot1.bwbot.org:3547", "cmd_vel", token);
udpclient.onOpen(() => {
    options.onConnect && options.onConnect();
});

udpclient.onMessageReceived((data) => {
    options.onMessage && options.onMessage(data);
});

udpclient.onDataReceived((data) => {
    options.onData && options.onData(data);
});

udpclient.onError(() => {
    options.onError && options.onError();
});

udpclient.onClose(() => {
    options.onClose && options.onClose();
});

udpclient.start();

results matching ""

    No results matching ""