1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
package utils
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
log "github.com/soesoftcn/process-manage/pkg/soelog"
"github.com/surgemq/message"
s "github.com/surgemq/surgemq/service"
"golang.org/x/net/websocket"
)
type ServerMessage struct {
IsBroadCast bool `json:"isBroadCast"`
ClientId string `json:"clientId"`
MsgType int `json:"msgType"`
Msg interface{} `json:"msg"`
}
var MqttSvr *s.Server
func InitMQTTServer() {
MqttSvr = &s.Server{
KeepAlive: 300, // seconds
ConnectTimeout: 2, // seconds
SessionsProvider: "mem", // keeps sessions in memory
Authenticator: "mockSuccess", // always succeed
TopicsProvider: "mem", // keeps topic subscriptions in memory
}
mqttaddr := "tcp://:1883"
addr := "tcp://127.0.0.1:1883"
AddWebsocketHandler("/mqtt", addr)
wsAddr := ":1882"
go ListenAndServeWebsocket(wsAddr)
err := MqttSvr.ListenAndServe(mqttaddr)
if err != nil {
log.Logger.Sugar().Errorf("surgemq/main: %v", err)
}
}
/* copy from reader to websocket, this copies the binary frames as is */
func io_ws_copy(src io.Reader, dst *websocket.Conn) (int, error) {
buffer := make([]byte, 2048)
count := 0
for {
n, err := src.Read(buffer)
if err != nil || n < 1 {
return count, err
}
count += n
err = websocket.Message.Send(dst, buffer[0:n])
if err != nil {
return count, err
}
}
return count, nil
}
/* copy from websocket to writer, this copies the binary frames as is */
func io_copy_ws(src *websocket.Conn, dst io.Writer) (int, error) {
var buffer []byte
count := 0
for {
err := websocket.Message.Receive(src, &buffer)
if err != nil {
return count, err
}
n := len(buffer)
count += n
i, err := dst.Write(buffer)
if err != nil || i < 1 {
return count, err
}
}
return count, nil
}
func WebsocketTcpProxy(ws *websocket.Conn, nettype string, host string) error {
client, err := net.Dial(nettype, host)
if err != nil {
return err
}
defer client.Close()
defer ws.Close()
chDone := make(chan bool)
go func() {
io_ws_copy(client, ws)
chDone <- true
}()
go func() {
io_copy_ws(ws, client)
chDone <- true
}()
<-chDone
return nil
}
func AddWebsocketHandler(urlPattern string, uri string) error {
sugar := log.Logger.Sugar()
sugar.Infof("AddWebsocketHandler urlPattern=%s, uri=%s", urlPattern, uri)
u, err := url.Parse(uri)
if err != nil {
sugar.Errorf("surgemq/main: %v", err)
return err
}
h := func(ws *websocket.Conn) {
WebsocketTcpProxy(ws, u.Scheme, u.Host)
}
http.Handle(urlPattern, websocket.Handler(h))
return nil
}
func ListenAndServeWebsocket(addr string) error {
return http.ListenAndServe(addr, nil)
}
func MessageFromServer(i ServerMessage) {
msg := message.NewPublishMessage()
payload, err := json.Marshal(i)
if err != nil {
fmt.Println(err)
}
msg.SetTopic([]byte("/superserver/notify"))
msg.SetPayload(payload)
MqttSvr.Publish(msg, nil)
}
|