serialAlox/message_handler.go

87 lines
1.9 KiB
Go

package main
import (
"context"
"log"
)
type ReaderWriter interface {
Write(p []byte) (n int, err error)
Read(b []byte) (n int, err error)
}
type Parser interface {
ParseBytes(p []byte) (n int, err error)
IsMessageAvailable() bool
GetNextMessage() (typeByte byte, payload []byte, err error)
}
type HandlerFunc func(byte, []byte)
type MessageHandler struct {
WriterReader ReaderWriter
Conf MessageHandlerConfig
MessageParser Parser
handlers map[byte]HandlerFunc
}
type MessageHandlerConfig struct {
}
func NewMessageHandlerConfig() *MessageHandlerConfig {
return &MessageHandlerConfig{}
}
func NewMessageHandler(con ReaderWriter, conf MessageHandlerConfig, msgParser Parser) *MessageHandler {
return &MessageHandler{
WriterReader: con,
Conf: conf,
MessageParser: msgParser,
handlers: make(map[byte]HandlerFunc),
}
}
func (mh *MessageHandler) RegisterMessageCallback(typeByte byte, fn HandlerFunc) error {
mh.handlers[typeByte] = fn
return nil
}
func (mh *MessageHandler) ListenAndServe() {
ctx, cancle := context.WithCancel(context.Background())
go mh.readBytes(ctx)
defer cancle()
for {
if mh.MessageParser.IsMessageAvailable() {
msgType, msgPayload, err := mh.MessageParser.GetNextMessage()
if err != nil {
log.Printf("Error getting message %v", err)
return
}
log.Printf("Got messagetype % X, with payload % X", msgType, msgPayload)
if mh.handlers[msgType] != nil {
mh.handlers[msgType](msgType, msgPayload)
}
}
}
}
func (mh *MessageHandler) readBytes(ctx context.Context) error {
buffer := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Proceed with reading
n, err := mh.WriterReader.Read(buffer)
if err != nil {
return err
}
// Process the read bytes
mh.MessageParser.ParseBytes(buffer[:n])
//time.Sleep(100 * time.Millisecond)
}
}
}