package main import ( "context" "log" ) type ReaderWriter interface { Write(p []byte) (n int, err error) Read(b []byte) (n int, err error) } type Parser interface { ParseBytes(p []byte) (n int, err error) IsMessageAvailable() bool GetNextMessage() (typeByte byte, payload []byte, err error) } type HandlerFunc func(byte, []byte) type MessageHandler struct { WriterReader ReaderWriter Conf MessageHandlerConfig MessageParser Parser handlers map[byte]HandlerFunc } type MessageHandlerConfig struct { } func NewMessageHandlerConfig() *MessageHandlerConfig { return &MessageHandlerConfig{} } func NewMessageHandler(con ReaderWriter, conf MessageHandlerConfig, msgParser Parser) *MessageHandler { return &MessageHandler{ WriterReader: con, Conf: conf, MessageParser: msgParser, handlers: make(map[byte]HandlerFunc), } } func (mh *MessageHandler) RegisterMessageCallback(typeByte byte, fn HandlerFunc) error { mh.handlers[typeByte] = fn return nil } func (mh *MessageHandler) ListenAndServe() { ctx, cancle := context.WithCancel(context.Background()) go mh.readBytes(ctx) defer cancle() for { if mh.MessageParser.IsMessageAvailable() { msgType, msgPayload, err := mh.MessageParser.GetNextMessage() if err != nil { log.Printf("Error getting message %v", err) return } log.Printf("Got messagetype % X, with payload % X", msgType, msgPayload) if mh.handlers[msgType] != nil { mh.handlers[msgType](msgType, msgPayload) } } } } func (mh *MessageHandler) readBytes(ctx context.Context) error { buffer := make([]byte, 1024) for { select { case <-ctx.Done(): return ctx.Err() default: // Proceed with reading n, err := mh.WriterReader.Read(buffer) if err != nil { return err } // Process the read bytes mh.MessageParser.ParseBytes(buffer[:n]) //time.Sleep(100 * time.Millisecond) } } }