Use Mqtt With Golang and Angular6

使用go作为MQTT服务端,angular作为客户端,进行消息交互

GO服务端这边

  • 需要能够监听MQTT Broker的tcp端口(默认:1883)
  • 需要能够监听MQTT Websocket的 ws 端口
  • 提供一个广播或者指定发送到某个客户端的方法

直接上代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137

package utils

import (
	"encoding/json"
	"fmt"
	"io"
	"net"
	"net/http"
	"net/url"

	log "github.com/soesoftcn/process-manage/pkg/soelog"
	"github.com/surgemq/message"
	s "github.com/surgemq/surgemq/service"
	"golang.org/x/net/websocket"
)

type ServerMessage struct {
	IsBroadCast bool        `json:"isBroadCast"`
	ClientId    string      `json:"clientId"`
	MsgType     int         `json:"msgType"`
	Msg         interface{} `json:"msg"`
}

var MqttSvr *s.Server

func InitMQTTServer() {
	MqttSvr = &s.Server{
		KeepAlive:        300,           // seconds
		ConnectTimeout:   2,             // seconds
		SessionsProvider: "mem",         // keeps sessions in memory
		Authenticator:    "mockSuccess", // always succeed
		TopicsProvider:   "mem",         // keeps topic subscriptions in memory
	}

	mqttaddr := "tcp://:1883"
	addr := "tcp://127.0.0.1:1883"
	AddWebsocketHandler("/mqtt", addr)
	wsAddr := ":1882"
	go ListenAndServeWebsocket(wsAddr)

	err := MqttSvr.ListenAndServe(mqttaddr)
	if err != nil {
		log.Logger.Sugar().Errorf("surgemq/main: %v", err)
	}
}

/* copy from reader to websocket, this copies the binary frames as is */
func io_ws_copy(src io.Reader, dst *websocket.Conn) (int, error) {
	buffer := make([]byte, 2048)
	count := 0
	for {
		n, err := src.Read(buffer)
		if err != nil || n < 1 {
			return count, err
		}
		count += n
		err = websocket.Message.Send(dst, buffer[0:n])
		if err != nil {
			return count, err
		}
	}
	return count, nil
}

/* copy from websocket to writer, this copies the binary frames as is */
func io_copy_ws(src *websocket.Conn, dst io.Writer) (int, error) {
	var buffer []byte
	count := 0
	for {
		err := websocket.Message.Receive(src, &buffer)
		if err != nil {
			return count, err
		}
		n := len(buffer)
		count += n
		i, err := dst.Write(buffer)
		if err != nil || i < 1 {
			return count, err
		}
	}
	return count, nil
}

func WebsocketTcpProxy(ws *websocket.Conn, nettype string, host string) error {
	client, err := net.Dial(nettype, host)
	if err != nil {
		return err
	}
	defer client.Close()
	defer ws.Close()
	chDone := make(chan bool)

	go func() {
		io_ws_copy(client, ws)
		chDone <- true
	}()
	go func() {
		io_copy_ws(ws, client)
		chDone <- true
	}()
	<-chDone
	return nil
}

func AddWebsocketHandler(urlPattern string, uri string) error {
	sugar := log.Logger.Sugar()

	sugar.Infof("AddWebsocketHandler urlPattern=%s, uri=%s", urlPattern, uri)
	u, err := url.Parse(uri)
	if err != nil {
		sugar.Errorf("surgemq/main: %v", err)
		return err
	}

	h := func(ws *websocket.Conn) {
		WebsocketTcpProxy(ws, u.Scheme, u.Host)
	}
	http.Handle(urlPattern, websocket.Handler(h))
	return nil
}

func ListenAndServeWebsocket(addr string) error {
	return http.ListenAndServe(addr, nil)
}

func MessageFromServer(i ServerMessage) {
	msg := message.NewPublishMessage()
	payload, err := json.Marshal(i)
	if err != nil {
		fmt.Println(err)
	}
	msg.SetTopic([]byte("/superserver/notify"))
	msg.SetPayload(payload)

	MqttSvr.Publish(msg, nil)
}

Licensed under CC BY-NC-SA 4.0
记录平时瞎折腾遇到的各种问题, 方便查找
使用 Hugo 构建
主题 Stack 3.29.0Jimmy 设计