diff --git a/README.md b/README.md index 077a096..8d2f566 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ go console is a go lib to implement consoles, it includes: - Std output console - Telnet console - SSH console +- MQTT console (with lib mqtt-shell of freedeamer82) - Other Consoles can be added easily(BLE,Serial etc..) @@ -54,6 +55,20 @@ sshc, _ := console.NewSSHConsoleWithCertificates(sshPrivateKeyPath, sshAuthorize go sshc.Start("localhost", sshPort, 2) ``` + +### Mqtt console example +```sh +func onNewConsole(console *console.Console) { + console.EnableLogin("root") +} + +opt1 := console.WithOptionMqttConsoleMaxConnections(3) +opt2 := console.WithOptionMqttConsoleTimeout(timeoutSec * time.Second) +mqttConsole := console.NewMqttConsole(clientOps, "test", opt1, opt2) +mqttConsole.AddCallbackOnNewConsole(onNewConsole) +go mqttConsole.Start() +``` + ### go console api handle operation - func (c *Console) Start() @@ -155,4 +170,7 @@ go build //console ssh (pub keys) ./server 3 + +//console mqtt +./server 4 ``` \ No newline at end of file diff --git a/examples/server/main.go b/examples/server/main.go index ae235d6..8b0ff5d 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -2,7 +2,10 @@ package main import ( "fmt" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/freedreamer82/go-console/examples/utils" "github.com/freedreamer82/go-console/pkg/console" + log "github.com/sirupsen/logrus" "os" "strconv" "time" @@ -19,7 +22,7 @@ var users = map[string]string{ const sshPrivateKeyPath = "examples/utils/server_rsa" const sshPrivateKeyPassPhrase = "test" -const timeoutSec = 10 +const timeoutSec = 20 //client1 and client2 are authorized const sshAuthorizedKeysPath = "examples/utils/authorized_keys" @@ -29,10 +32,11 @@ const ( Telnet SSHPassword SSHPublicKey + Mqtt ) //change this const to start another console -const example = StdOutput +const example = Mqtt func main() { @@ -51,11 +55,13 @@ func main() { startSSHPasswordConsole() case SSHPublicKey: startSSHPublicKeyConsole() + case Mqtt: + startMqttConsole() } } -func onNewTelnetConsole(console *console.Console) { +func onNewConsole(console *console.Console) { console.EnableLogin("root") } @@ -73,7 +79,7 @@ func startTelnetConsole() { fmt.Printf("opening Telnet console on localhost %d", telnetPort) fmt.Println() ct := console.NewTelnetConsole(telnetPort, 2) - ct.AddCallbackOnNewConsole(onNewTelnetConsole) + ct.AddCallbackOnNewConsole(onNewConsole) ct.SetTimeout(timeoutSec * time.Second) select {} } @@ -97,3 +103,22 @@ func startSSHPublicKeyConsole() { go sshc.Start("localhost", sshPort, 2) select {} } + +func startMqttConsole() { + fmt.Printf("opening Mqtt console") + fmt.Println() + clientOps := mqtt.NewClientOptions() + addr := fmt.Sprintf("tcp://%s:%d", utils.BrokerHost, utils.BrokerPort) + log.Info("Connecting to : " + addr) + clientOps.AddBroker(addr) + if utils.BrokerUser != "" && utils.BrokerPassword != "" { + clientOps.SetUsername(utils.BrokerUser) + clientOps.SetPassword(utils.BrokerPassword) + } + opt1 := console.WithOptionMqttConsoleMaxConnections(3) + opt2 := console.WithOptionMqttConsoleTimeout(timeoutSec * time.Second) + mqttConsole := console.NewMqttConsole(clientOps, "test", opt1, opt2) + mqttConsole.AddCallbackOnNewConsole(onNewConsole) + go mqttConsole.Start() + select {} +} diff --git a/examples/utils/brokerconf.go b/examples/utils/brokerconf.go new file mode 100644 index 0000000..7bdbb7d --- /dev/null +++ b/examples/utils/brokerconf.go @@ -0,0 +1,6 @@ +package utils + +const BrokerHost = "" +const BrokerPort = 1883 +const BrokerUser = "" +const BrokerPassword = "" diff --git a/go.mod b/go.mod index d13a37f..dc66286 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,10 @@ -module github.com/freedreamer82/go-console +module github.com/freedreamer82/go-console go 1.18 require ( + github.com/eclipse/paho.mqtt.golang v1.4.1 + github.com/freedreamer82/mqtt-shell v0.0.0-20220624081242-fdcac9e75f9d github.com/lithammer/shortuuid/v3 v3.0.7 github.com/sirupsen/logrus v1.8.1 golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9 @@ -10,6 +12,9 @@ require ( ) require ( - github.com/google/uuid v1.2.0 // indirect - golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect + golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d // indirect ) diff --git a/pkg/console/console.go b/pkg/console/console.go index e9a1f40..6adbbb6 100644 --- a/pkg/console/console.go +++ b/pkg/console/console.go @@ -72,7 +72,15 @@ type Console struct { uuid string } -func NewConsole(iorw ConsoleI) *Console { +type ConsoleOption func(console *Console) + +func WithOptionCustomUUID(uuid string) ConsoleOption { + return func(console *Console) { + console.uuid = uuid + } +} + +func NewConsole(iorw ConsoleI, opts ...ConsoleOption) *Console { c := Console{term: terminal.NewTerminal(iorw, prompt), eol: eol, mask: 0, welcome: defaultWelcome, userLevel: Root, iorw: iorw, onclose: nil} @@ -86,6 +94,10 @@ func NewConsole(iorw ConsoleI) *Console { c.timeout = 0 c.lastActivitytime = time.Now() + for _, opt := range opts { + opt(&c) + } + c.AddCallbackOnClose(c.dummyCb) log.Printf("Open Console %s", c.uuid) return &c @@ -184,6 +196,7 @@ func (c *Console) handleLogin(cmd string) bool { if cmd == c.password { c.mask.ToggleFlag(USER_LOGGED) c.enablePrompt(true) + c.Print("Authenticated") return true } return false diff --git a/pkg/console/consolemqtt.go b/pkg/console/consolemqtt.go new file mode 100644 index 0000000..f9198de --- /dev/null +++ b/pkg/console/consolemqtt.go @@ -0,0 +1,219 @@ +package console + +import ( + "fmt" + mqtt "github.com/eclipse/paho.mqtt.golang" + mqttinfo "github.com/freedreamer82/mqtt-shell/pkg/info" + mqttchat "github.com/freedreamer82/mqtt-shell/pkg/mqttchat" + log "github.com/sirupsen/logrus" + "io" + "strings" + "sync" + "time" +) + +const defaultPresentationMessage = "whoami" + +const topicPrefix = "/mqtt-shell/" + +var templateRxTopic = topicPrefix + "%s/cmd" +var templateTxTopic = topicPrefix + "%s/cmd/res" +var templateBeaconEvent = topicPrefix + "%s/event" + +const topicBeaconRequest = topicPrefix + "whoami" +const defautMqttConsoleWelcome = "*** Welcome MQTT Console ***" + +type outMessage struct { + msg string + clientUUID string + cmdUUID string +} + +type mqttConsoleConnection struct { + uuid string + lastCmdUuid string + in chan []byte + out *chan outMessage + isUp bool +} + +func (conn *mqttConsoleConnection) Read(b []byte) (int, error) { + data := <-conn.in + if data == nil || string(data) == defaultPresentationMessage { + return 0, nil + } + dataEOF := string(data) + "\r\n" + n := copy(b, []byte(dataEOF)) + return n, nil +} + +func (conn *mqttConsoleConnection) Close() error { + conn.isUp = false + conn.in <- []byte("onclose") + close(conn.in) + return nil +} + +func (conn *mqttConsoleConnection) Write(b []byte) (int, error) { + if conn.isUp { + msg := strings.Trim(string(b), "\r\n") + *conn.out <- outMessage{msg: msg, clientUUID: conn.uuid, cmdUUID: conn.lastCmdUuid} + } + return len(b), nil +} + +type MqttConsole struct { + mqttChat *mqttchat.MqttChat + connections sync.Map + consoles sync.Map + callbackOnNewConsole OnNewConsole + timeout time.Duration + maxConnections int + chOut chan outMessage +} + +type MqttConsoleOption func(console *MqttConsole) + +func WithOptionMqttConsoleMaxConnections(maxConnections int) MqttConsoleOption { + return func(console *MqttConsole) { + console.maxConnections = maxConnections + } +} + +func WithOptionMqttConsoleTimeout(timeout time.Duration) MqttConsoleOption { + return func(console *MqttConsole) { + console.timeout = timeout + } +} + +func (mqttConsole *MqttConsole) AddCallbackOnNewConsole(cb OnNewConsole) { + mqttConsole.callbackOnNewConsole = cb +} + +func (mqttConsole *MqttConsole) countConnections() int { + size := 0 + mqttConsole.connections.Range(func(k, v interface{}) bool { + size++ + return true + }) + return size +} + +func NewMqttConsole(mqttOption *mqtt.ClientOptions, instanceID string, opts ...MqttConsoleOption) *MqttConsole { + + mqttConsole := &MqttConsole{maxConnections: 0} + + txTopic := fmt.Sprintf(templateTxTopic, instanceID) + rxTopic := fmt.Sprintf(templateRxTopic, instanceID) + beaconTopic := fmt.Sprintf(templateBeaconEvent, instanceID) + version := fmt.Sprintf("mqtt-%s", mqttinfo.VERSION) + + mqttChat := mqttchat.NewChat(mqttOption, rxTopic, txTopic, version, mqttchat.WithOptionBeaconTopic(beaconTopic, topicBeaconRequest)) + mqttChat.SetDataCallback(mqttConsole.onDataRx) + + out := make(chan outMessage, 100) + + mqttConsole.chOut = out + mqttConsole.mqttChat = mqttChat + + for _, opt := range opts { + opt(mqttConsole) + } + + return mqttConsole +} + +func (mqttConsole *MqttConsole) Start() { + log.Info("START - MQTT Console") + go mqttConsole.dataTx() + mqttConsole.mqttChat.Start() +} + +func (mqttConsole *MqttConsole) removeConsoleAndConnection(clientUUID string) { + log.Infof("Close mqtt connection with client: %s", clientUUID) + mqttConsole.connections.Delete(clientUUID) + mqttConsole.consoles.Delete(clientUUID) +} + +func (mqttConsole *MqttConsole) createNewConsoleAndConnection(clientUUID string) (*Console, *mqttConsoleConnection) { + in := make(chan []byte, 20) + conn := &mqttConsoleConnection{uuid: clientUUID, out: &mqttConsole.chOut, in: in, isUp: true} + + mqttConsole.connections.Store(clientUUID, conn) + + consoleIO := struct { + io.ReadCloser + io.Writer + Flusher + }{ + conn, + conn, + nil, + } + + console := NewConsole(consoleIO, WithOptionCustomUUID(clientUUID)) + console.AddCallbackOnClose(func() { + mqttConsole.removeConsoleAndConnection(clientUUID) + }) + + if mqttConsole.callbackOnNewConsole != nil { + mqttConsole.callbackOnNewConsole(console) + } + + console.SetTimeout(mqttConsole.timeout) + + mqttConsole.consoles.Store(clientUUID, console) + + log.Infof("Open mqtt connection with client: %s", clientUUID) + + return console, conn +} + +func (mqttConsole *MqttConsole) onDataRx(data mqttchat.MqttJsonData) { + + if data.CmdUUID == "" || data.Cmd == "" || data.Data == "" || data.ClientUUID == "" { + return + } + + _, consoleExist := mqttConsole.consoles.Load(data.ClientUUID) + + if !consoleExist { + if mqttConsole.maxConnections > 0 && mqttConsole.countConnections() >= mqttConsole.maxConnections { + log.Warn("Max number of connection reached") + return + } + newConsole, newConn := mqttConsole.createNewConsoleAndConnection(data.ClientUUID) + *newConn.out <- outMessage{msg: " \r\n", clientUUID: data.ClientUUID, cmdUUID: data.CmdUUID} + newConsole.SetWelcomeMessage(defautMqttConsoleWelcome) + newConsole.Start() + return + } + + conn, connExist := mqttConsole.connections.Load(data.ClientUUID) + + if !connExist || conn == nil { + log.Warn("Connection not found") + return + } + + mqttConn := conn.(*mqttConsoleConnection) + mqttConn.lastCmdUuid = data.CmdUUID + mqttConn.in <- []byte(data.Data) + +} + +func (mqttConsole *MqttConsole) dataTx() { + for { + select { + case out := <-mqttConsole.chOut: + outMsg := out.msg + if outMsg != "" { + if outMsg == "\r\n" { + outMsg = "" + } + outMsg = strings.Replace(outMsg, ">", "", -1) + mqttConsole.mqttChat.Transmit(outMsg, out.cmdUUID, out.clientUUID) + } + } + } +}