From 91f51391f276e5a40a28ddc673a6ccf341d80951 Mon Sep 17 00:00:00 2001 From: luclmt Date: Tue, 5 Apr 2022 19:00:18 +0200 Subject: [PATCH 1/2] send message on websocket --- README.md | 9 ++ go.mod | 1 + .../sseClients.go => clients/clients.go} | 63 +++++++---- pkg/http2mqtt/http2mqtt.go | 104 +++++++++++++++--- 4 files changed, 144 insertions(+), 33 deletions(-) rename pkg/{sseClients/sseClients.go => clients/clients.go} (51%) diff --git a/README.md b/README.md index 03896c3..c7f1678 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ http://localhost:8000/publish http://localhost:8000/subscribe http://localhost:8000/broker http://localhost:8000/streams +ws://localhost:8000/ws ``` @@ -105,6 +106,14 @@ data:"eyJjb21tYW5kIjoicmVzZXQifQ==" curl -u $user:$pass curl -i -X GET -H "Content-Type: application/json" http://localhost:8000/streams ``` + +##### localhost:8000/ws{GET} +websocket stream to receive messages of the subscribed topics +(using websocat from https://github.com/vi/websocat) +``` +websocat ws://localhost:8000/ws +``` + ### Development Want to contribute? Great! diff --git a/go.mod b/go.mod index e89e75b..ff7cd5a 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/gin-contrib/pprof v1.3.0 github.com/gin-gonic/gin v1.7.7 github.com/gorilla/mux v1.7.4 + github.com/gorilla/websocket v1.4.2 github.com/satori/go.uuid v1.2.0 github.com/sirupsen/logrus v1.6.0 golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect diff --git a/pkg/sseClients/sseClients.go b/pkg/clients/clients.go similarity index 51% rename from pkg/sseClients/sseClients.go rename to pkg/clients/clients.go index 6c1f4ef..4f0726c 100644 --- a/pkg/sseClients/sseClients.go +++ b/pkg/clients/clients.go @@ -1,4 +1,4 @@ -package sseClients +package clients import ( "sync" @@ -6,34 +6,53 @@ import ( uuid "github.com/satori/go.uuid" ) -type SseCLients struct { - // clients map[*SseClient]bool +type Clients struct { + // clients map[*Client]bool clients sync.Map } -type SseClientData map[string]interface{} +type ClientData map[string]interface{} +type ClientType uint8 -type SseClient struct { - Id string - Data SseClientData +const ( + SseClient ClientType = 1 + WsClient ClientType = 2 +) + +func (ct ClientType) IsValid() bool { + switch ct { + case SseClient, WsClient: + return true + } + return false +} + +type Client struct { + Id string + Data ClientData + clientType ClientType } -func (m *SseCLients) RegisterNewCLient() *SseClient { +func (m *Clients) RegisterNewClient(clientType ClientType) *Client { var err error u1 := uuid.Must(uuid.NewV4(), err) - cl := SseClient{Id: u1.String(), Data: nil} + if !clientType.IsValid() { + panic("client type not handled") + } + + cl := Client{Id: u1.String(), Data: nil, clientType: clientType} return &cl } -// func (m *SseCLients) GetClientsList() map[*SseClient]bool { +// func (m *Clients) GetClientsList() map[*Client]bool { // return m.clients // } -func (m *SseCLients) GetRegisteredClients() int { +func (m *Clients) GetRegisteredClients() int { cnt := 0 m.clients.Range(func(key, value interface{}) bool { @@ -45,12 +64,16 @@ func (m *SseCLients) GetRegisteredClients() int { } -func (m *SseCLients) RegisterNewCLientWithData(data SseClientData) *SseClient { +func (m *Clients) RegisterNewClientWithData(clientType ClientType, data ClientData) *Client { var err error u1 := uuid.Must(uuid.NewV4(), err) - cl := SseClient{Id: u1.String(), Data: data} + if !clientType.IsValid() { + panic("client type not handled") + } + + cl := Client{Id: u1.String(), Data: data, clientType: clientType} //func (m *Map) Store(key, value interface{}) // Key and value are interface types, which can store data of any type @@ -59,14 +82,14 @@ func (m *SseCLients) RegisterNewCLientWithData(data SseClientData) *SseClient { return &cl } -func New() *SseCLients { +func New() *Clients { - manager := new(SseCLients) + manager := new(Clients) return manager } -func (manager *SseCLients) SetDisconnected(conn *SseClient) { +func (manager *Clients) SetDisconnected(conn *Client) { // func (m *Map) Load(key interface{}) (value interface{}, ok bool) // Key: key, value: if the data exists, the corresponding value will be returned; if not, nil will be returned. ok: indicates whether the value is found // Remember to process data assertions @@ -77,13 +100,13 @@ func (manager *SseCLients) SetDisconnected(conn *SseClient) { } -func (manager *SseCLients) RemoveCLient(conn *SseClient) { +func (manager *Clients) RemoveClient(conn *Client) { manager.clients.Delete(conn) } -func (m *SseCLients) PurgeDisconnected() { +func (m *Clients) PurgeDisconnected() { //func (m *Map) Range(f func(key, value interface{}) bool) // The callback function used by the user to process data. The parameters of the callback function are key, value and the return value is bool @@ -97,11 +120,11 @@ func (m *SseCLients) PurgeDisconnected() { } -func (m *SseCLients) FuncIterationForClients(fn func(client *SseClient, isConnected bool) bool) bool { +func (m *Clients) FuncIterationForClients(fn func(client *Client, isConnected bool) bool) bool { m.clients.Range(func(key, value interface{}) bool { if value == true { - if ok := fn(key.(*SseClient), value.(bool)); ok == false { + if ok := fn(key.(*Client), value.(bool)); ok == false { return false } } diff --git a/pkg/http2mqtt/http2mqtt.go b/pkg/http2mqtt/http2mqtt.go index b5dea95..fd8a3dd 100644 --- a/pkg/http2mqtt/http2mqtt.go +++ b/pkg/http2mqtt/http2mqtt.go @@ -2,21 +2,24 @@ package http2mqtt import ( "encoding/json" + log "github.com/sirupsen/logrus" "io" - "log" "math/rand" "net/http" "sync" "time" + "github.com/freedreamer82/go-http2mqtt/pkg/clients" + MQTT "github.com/eclipse/paho.mqtt.golang" - "github.com/freedreamer82/go-http2mqtt/pkg/sseClients" "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" //"time" ) const MaxClientIdLen = 14 +const channelClientSize = 50 func (s *Http2Mqtt) SetGinAuth(user string, password string) { s.user = user @@ -25,10 +28,10 @@ func (s *Http2Mqtt) SetGinAuth(user string, password string) { // PublishMessage is the sample data transfer object for publish http route type PublishMessage struct { - Topic string `binding:"required" json:"topic"` + Topic string `binding:"required" json:"topic"` Message json.RawMessage `binding:"required" json:"data"` - Qos byte `json:"qos"` - Retained bool `json:"retained"` + Qos byte `json:"qos"` + Retained bool `json:"retained"` } type SubScribeMessage struct { @@ -44,7 +47,7 @@ type Http2Mqtt struct { mqttOpts *MQTT.ClientOptions user string password string - sseCLients *sseClients.SseCLients + clients *clients.Clients subsMutex sync.Mutex subs []SubScribeMessage profileEnable bool @@ -53,6 +56,12 @@ type Http2Mqtt struct { streamChannel chan MQTT.Message } +type WsHandler struct { + id string + ws *websocket.Conn + send *chan MQTT.Message +} + func getRandomClientId() string { var characterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") b := make([]rune, MaxClientIdLen) @@ -195,11 +204,11 @@ func (m *Http2Mqtt) onBrokerData(client MQTT.Client, msg MQTT.Message) { } //m.mqttMsgChan <- msg - m.sseCLients.FuncIterationForClients(func(ssecl *sseClients.SseClient, isConnected bool) bool { + m.clients.FuncIterationForClients(func(cl *clients.Client, isConnected bool) bool { if isConnected { //casting to *chan of MQTT.Message - ch := ssecl.Data["chan"].(*chan MQTT.Message) + ch := cl.Data["chan"].(*chan MQTT.Message) *ch <- msg return true } @@ -265,7 +274,7 @@ func (m *Http2Mqtt) setupMQTT() { func (m *Http2Mqtt) setupGin() { - m.sseCLients = sseClients.New() + m.clients = clients.New() if m.Group == nil { m.Group = &m.Router.RouterGroup } @@ -339,14 +348,50 @@ func (m *Http2Mqtt) setupGin() { }) // Streams SSE + m.Group.GET(m.prefixRestApi+"/ws", func(c *gin.Context) { + + data := make(clients.ClientData) + + ch := make(chan MQTT.Message, channelClientSize) + data["chan"] = &ch //keep only the pointer here...and closinf from this function later(2 avoid copy) + + cl := m.clients.RegisterNewClientWithData(clients.WsClient, data) + log.Println("new connection id:" + cl.Id) + + var wsUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + ws, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + log.Errorf("Failed to set websocket upgrade: %v\n", err) + close(ch) + return + } + + wsHandler := &WsHandler{id: cl.Id, ws: ws, send: &ch} + + onClose := func() { + log.Infof("Closing ws connection") + close(ch) + ws.Close() + m.clients.RemoveClient(cl) + log.Infof("removing ws client: " + cl.Id) + } + + go wsHandler.wsReadProcess(onClose) + go wsHandler.wsWriteProcess(onClose) + }) + m.Group.GET(m.prefixRestApi+"/streams", func(c *gin.Context) { - data := make(sseClients.SseClientData) + data := make(clients.ClientData) - ch := make(chan MQTT.Message, 5) + ch := make(chan MQTT.Message, channelClientSize) data["chan"] = &ch //keep only the pointer here...and closinf from this function later(2 avoid copy) - cl := m.sseCLients.RegisterNewCLientWithData(data) + cl := m.clients.RegisterNewClientWithData(clients.SseClient, data) log.Println("new connection id:" + cl.Id) defer close(ch) @@ -366,7 +411,7 @@ func (m *Http2Mqtt) setupGin() { } case <-c.Writer.CloseNotify(): - m.sseCLients.RemoveCLient(cl) + m.clients.RemoveClient(cl) //close(ch) log.Println("removing sse client: " + cl.Id) return false @@ -380,3 +425,36 @@ func (m *Http2Mqtt) setupGin() { pprof.Register(m.Router, m.prefixRestApi+"/debug/pprof") } } + +func (h *WsHandler) wsWriteProcess(onClose func()) { + defer onClose() + for { + select { + case msg := <-*h.send: + if msg != nil { + err := h.ws.WriteMessage(websocket.TextMessage, msg.Payload()) + if err != nil { + log.Errorf("error: %v", err) + return + } + } + } + } +} + +func (h *WsHandler) wsReadProcess(onClose func()) { + defer onClose() + for { + _, message, err := h.ws.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err) || websocket.IsCloseError(err) || err == io.EOF { + log.Infof("Connection close client id: %s", h.id) + } else { + log.Errorf("error: %v", err) + } + break + } else { + log.Tracef("ws message received: %s", string(message)) + } + } +} From 7c411301f2613a7b4256519f6f84d8f9b8088402 Mon Sep 17 00:00:00 2001 From: luclmt Date: Tue, 5 Apr 2022 19:02:56 +0200 Subject: [PATCH 2/2] fix --- pkg/http2mqtt/http2mqtt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/http2mqtt/http2mqtt.go b/pkg/http2mqtt/http2mqtt.go index fd8a3dd..436a987 100644 --- a/pkg/http2mqtt/http2mqtt.go +++ b/pkg/http2mqtt/http2mqtt.go @@ -122,7 +122,7 @@ func (h *Http2Mqtt) GetMqttClient() *MQTT.Client { func (h *Http2Mqtt) EnableStream(status bool) { - h.streamEnabled = false + h.streamEnabled = status } func (h *Http2Mqtt) SetStreamChannel(ch chan MQTT.Message) {