这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ http://localhost:8000/publish
http://localhost:8000/subscribe
http://localhost:8000/broker
http://localhost:8000/streams
ws://localhost:8000/ws
```


Expand Down Expand Up @@ -105,6 +106,14 @@ data:"eyJjb21tYW5kIjoicmVzZXQifQ=="
curl -u $user:$pass curl -i -X GET -H "Content-Type: application/json" http://localhost:8000/streams
```


##### localhost:8000/ws{GET}
websocket stream to receive messages of the subscribed topics
(using websocat from https://github.com/vi/websocat)
```
websocat ws://localhost:8000/ws
```

### Development

Want to contribute? Great!
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.7.7
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.6.0
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect
Expand Down
63 changes: 43 additions & 20 deletions pkg/sseClients/sseClients.go → pkg/clients/clients.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,58 @@
package sseClients
package clients

import (
"sync"

uuid "github.com/satori/go.uuid"
)

type SseCLients struct {
// clients map[*SseClient]bool
type Clients struct {
// clients map[*Client]bool
clients sync.Map
}

type SseClientData map[string]interface{}
type ClientData map[string]interface{}
type ClientType uint8

type SseClient struct {
Id string
Data SseClientData
const (
SseClient ClientType = 1
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add in the main the flags to setup this features.

WsClient ClientType = 2
)

func (ct ClientType) IsValid() bool {
switch ct {
case SseClient, WsClient:
return true
}
return false
}

type Client struct {
Id string
Data ClientData
clientType ClientType
}

func (m *SseCLients) RegisterNewCLient() *SseClient {
func (m *Clients) RegisterNewClient(clientType ClientType) *Client {

var err error
u1 := uuid.Must(uuid.NewV4(), err)

cl := SseClient{Id: u1.String(), Data: nil}
if !clientType.IsValid() {
panic("client type not handled")
}

cl := Client{Id: u1.String(), Data: nil, clientType: clientType}

return &cl
}

// func (m *SseCLients) GetClientsList() map[*SseClient]bool {
// func (m *Clients) GetClientsList() map[*Client]bool {

// return m.clients
// }

func (m *SseCLients) GetRegisteredClients() int {
func (m *Clients) GetRegisteredClients() int {

cnt := 0
m.clients.Range(func(key, value interface{}) bool {
Expand All @@ -45,12 +64,16 @@ func (m *SseCLients) GetRegisteredClients() int {

}

func (m *SseCLients) RegisterNewCLientWithData(data SseClientData) *SseClient {
func (m *Clients) RegisterNewClientWithData(clientType ClientType, data ClientData) *Client {

var err error
u1 := uuid.Must(uuid.NewV4(), err)

cl := SseClient{Id: u1.String(), Data: data}
if !clientType.IsValid() {
panic("client type not handled")
}

cl := Client{Id: u1.String(), Data: data, clientType: clientType}

//func (m *Map) Store(key, value interface{})
// Key and value are interface types, which can store data of any type
Expand All @@ -59,14 +82,14 @@ func (m *SseCLients) RegisterNewCLientWithData(data SseClientData) *SseClient {
return &cl
}

func New() *SseCLients {
func New() *Clients {

manager := new(SseCLients)
manager := new(Clients)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can avoid the new

c := Clients{}

return &c


return manager
}

func (manager *SseCLients) SetDisconnected(conn *SseClient) {
func (manager *Clients) SetDisconnected(conn *Client) {
// func (m *Map) Load(key interface{}) (value interface{}, ok bool)
// Key: key, value: if the data exists, the corresponding value will be returned; if not, nil will be returned. ok: indicates whether the value is found
// Remember to process data assertions
Expand All @@ -77,13 +100,13 @@ func (manager *SseCLients) SetDisconnected(conn *SseClient) {

}

func (manager *SseCLients) RemoveCLient(conn *SseClient) {
func (manager *Clients) RemoveClient(conn *Client) {

manager.clients.Delete(conn)

}

func (m *SseCLients) PurgeDisconnected() {
func (m *Clients) PurgeDisconnected() {

//func (m *Map) Range(f func(key, value interface{}) bool)
// The callback function used by the user to process data. The parameters of the callback function are key, value and the return value is bool
Expand All @@ -97,11 +120,11 @@ func (m *SseCLients) PurgeDisconnected() {

}

func (m *SseCLients) FuncIterationForClients(fn func(client *SseClient, isConnected bool) bool) bool {
func (m *Clients) FuncIterationForClients(fn func(client *Client, isConnected bool) bool) bool {

m.clients.Range(func(key, value interface{}) bool {
if value == true {
if ok := fn(key.(*SseClient), value.(bool)); ok == false {
if ok := fn(key.(*Client), value.(bool)); ok == false {
return false
}
}
Expand Down
106 changes: 92 additions & 14 deletions pkg/http2mqtt/http2mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ package http2mqtt

import (
"encoding/json"
log "github.com/sirupsen/logrus"
"io"
"log"
"math/rand"
"net/http"
"sync"
"time"

"github.com/freedreamer82/go-http2mqtt/pkg/clients"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/freedreamer82/go-http2mqtt/pkg/sseClients"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
//"time"
)

const MaxClientIdLen = 14
const channelClientSize = 50

func (s *Http2Mqtt) SetGinAuth(user string, password string) {
s.user = user
Expand All @@ -25,10 +28,10 @@ func (s *Http2Mqtt) SetGinAuth(user string, password string) {

// PublishMessage is the sample data transfer object for publish http route
type PublishMessage struct {
Topic string `binding:"required" json:"topic"`
Topic string `binding:"required" json:"topic"`
Message json.RawMessage `binding:"required" json:"data"`
Qos byte `json:"qos"`
Retained bool `json:"retained"`
Qos byte `json:"qos"`
Retained bool `json:"retained"`
}

type SubScribeMessage struct {
Expand All @@ -44,7 +47,7 @@ type Http2Mqtt struct {
mqttOpts *MQTT.ClientOptions
user string
password string
sseCLients *sseClients.SseCLients
clients *clients.Clients
subsMutex sync.Mutex
subs []SubScribeMessage
profileEnable bool
Expand All @@ -53,6 +56,12 @@ type Http2Mqtt struct {
streamChannel chan MQTT.Message
}

type WsHandler struct {
id string
ws *websocket.Conn
send *chan MQTT.Message
}

func getRandomClientId() string {
var characterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
b := make([]rune, MaxClientIdLen)
Expand Down Expand Up @@ -113,7 +122,7 @@ func (h *Http2Mqtt) GetMqttClient() *MQTT.Client {

func (h *Http2Mqtt) EnableStream(status bool) {

h.streamEnabled = false
h.streamEnabled = status
}

func (h *Http2Mqtt) SetStreamChannel(ch chan MQTT.Message) {
Expand Down Expand Up @@ -195,11 +204,11 @@ func (m *Http2Mqtt) onBrokerData(client MQTT.Client, msg MQTT.Message) {
}

//m.mqttMsgChan <- msg
m.sseCLients.FuncIterationForClients(func(ssecl *sseClients.SseClient, isConnected bool) bool {
m.clients.FuncIterationForClients(func(cl *clients.Client, isConnected bool) bool {

if isConnected {
//casting to *chan of MQTT.Message
ch := ssecl.Data["chan"].(*chan MQTT.Message)
ch := cl.Data["chan"].(*chan MQTT.Message)
*ch <- msg
return true
}
Expand Down Expand Up @@ -265,7 +274,7 @@ func (m *Http2Mqtt) setupMQTT() {

func (m *Http2Mqtt) setupGin() {

m.sseCLients = sseClients.New()
m.clients = clients.New()
if m.Group == nil {
m.Group = &m.Router.RouterGroup
}
Expand Down Expand Up @@ -339,14 +348,50 @@ func (m *Http2Mqtt) setupGin() {
})

// Streams SSE
m.Group.GET(m.prefixRestApi+"/ws", func(c *gin.Context) {

data := make(clients.ClientData)

ch := make(chan MQTT.Message, channelClientSize)
data["chan"] = &ch //keep only the pointer here...and closinf from this function later(2 avoid copy)

cl := m.clients.RegisterNewClientWithData(clients.WsClient, data)
log.Println("new connection id:" + cl.Id)

var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

ws, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Errorf("Failed to set websocket upgrade: %v\n", err)
close(ch)
return
}

wsHandler := &WsHandler{id: cl.Id, ws: ws, send: &ch}

onClose := func() {
log.Infof("Closing ws connection")
close(ch)
ws.Close()
m.clients.RemoveClient(cl)
log.Infof("removing ws client: " + cl.Id)
}

go wsHandler.wsReadProcess(onClose)
go wsHandler.wsWriteProcess(onClose)
})

m.Group.GET(m.prefixRestApi+"/streams", func(c *gin.Context) {

data := make(sseClients.SseClientData)
data := make(clients.ClientData)

ch := make(chan MQTT.Message, 5)
ch := make(chan MQTT.Message, channelClientSize)
data["chan"] = &ch //keep only the pointer here...and closinf from this function later(2 avoid copy)

cl := m.sseCLients.RegisterNewCLientWithData(data)
cl := m.clients.RegisterNewClientWithData(clients.SseClient, data)
log.Println("new connection id:" + cl.Id)

defer close(ch)
Expand All @@ -366,7 +411,7 @@ func (m *Http2Mqtt) setupGin() {
}

case <-c.Writer.CloseNotify():
m.sseCLients.RemoveCLient(cl)
m.clients.RemoveClient(cl)
//close(ch)
log.Println("removing sse client: " + cl.Id)
return false
Expand All @@ -380,3 +425,36 @@ func (m *Http2Mqtt) setupGin() {
pprof.Register(m.Router, m.prefixRestApi+"/debug/pprof")
}
}

func (h *WsHandler) wsWriteProcess(onClose func()) {
defer onClose()
for {
select {
case msg := <-*h.send:
if msg != nil {
err := h.ws.WriteMessage(websocket.TextMessage, msg.Payload())
if err != nil {
log.Errorf("error: %v", err)
return
}
}
}
}
}

func (h *WsHandler) wsReadProcess(onClose func()) {
defer onClose()
for {
_, message, err := h.ws.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err) || websocket.IsCloseError(err) || err == io.EOF {
log.Infof("Connection close client id: %s", h.id)
} else {
log.Errorf("error: %v", err)
}
break
} else {
log.Tracef("ws message received: %s", string(message))
}
}
}