A lightweight, thread-safe Redis-backed queue implementation in Go with support for distributed notifications and priority queuing.
- Thread-safe queue operations
- Priority queue support
- Distributed queue with real-time notifications
- Automatic message expiration support
- Graceful shutdown handling
- Concurrent operations support
- Simple API for queue operations
go get github.com/goptics/redisq
import "github.com/goptics/redisq"
func main() {
// Initialize queue system
qs := redisq.New("redis://localhost:6379")
defer qs.Close()
// Create a simple queue
queue := qs.NewQueue("my-queue")
defer queue.Close()
// Enqueue items
queue.Enqueue("hello world")
// Dequeue items
data, ok := queue.Dequeue()
if ok {
fmt.Println(string(data.([]byte)))
}
}
// Create a queue
queue := qs.NewQueue("my-queue")
// Optional: Set expiration for queue items
queue.SetExpiration(time.Hour)
// Enqueue items (supports []byte and string)
queue.Enqueue("test message")
queue.Enqueue([]byte("binary data"))
// Get queue length
length := queue.Len()
// Get all values
values := queue.Values()
// Purge the queue
queue.Purge()
// Create a priority queue
pq := qs.NewPriorityQueue("priority-queue")
// Enqueue items with priority (lower number = higher priority)
pq.Enqueue("high priority", 1)
pq.Enqueue("medium priority", 2)
pq.Enqueue("low priority", 3)
// Items will be dequeued in priority order
data, ok := pq.Dequeue() // Returns "high priority"
// Create a distributed priority queue
dpq := qs.NewDistributedPriorityQueue("distributed-priority-queue")
// Subscribe to notifications for queue and dequeue events
dpq.Subscribe(func(action string) {
fmt.Printf("Action: %s\n", action)
})
// Enqueue with priority will trigger "enqueued" notification
dpq.Enqueue("important message", 1)
// Dequeue will trigger "dequeued" notification
data, ok := dpq.Dequeue()
// Create a distributed queue
dq := qs.NewDistributedQueue("distributed-queue")
// Subscribe to notifications for queue and dequeue events
dq.Subscribe(func(action string) {
fmt.Printf("Action: %s\n", action)
})
// Enqueue will trigger "enqueued" notification
dq.Enqueue("test message")
// Dequeue will trigger "dequeued" notification
data, ok := dq.Dequeue()
// Create a queue with acknowledgment support
queue := qs.NewQueue("ack-queue")
// Set acknowledgment timeout (how long before unacknowledged items are requeued)
queue.SetAckTimeout(time.Minute * 5)
// Dequeue an item with a unique acknowledgment ID
ackID := "job-123"
item, ok := queue.Dequeue()
if ok {
// Process the item
processItem(item)
// Mark the item as successfully processed
queue.Acknowledge(ackID)
}
// For manual control of the acknowledgment process:
// 1. Prepare an item for future acknowledgment
ackID := "job-456"
data := "important job"
err := queue.PrepareForFutureAck(ackID, data)
// 2. Acknowledge the item after processing
success := queue.Acknowledge(ackID)
// You can trigger requeue of all unacknowledged items
queue.RequeueNackedItems()
// 4. Get count of pending unacknowledged items
pendingCount := queue.GetNackedItemsCount()
If you have docker installed just do the following:
cp .env.example .env
docker compose up -d
you can change the
REDIS_PORT
in the .env file
go test -race -v ./...
- Go 1.24.1 or higher
- Redis 6.0 or higher
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
- GitHub: @fahimfaisaal
- LinkedIn: in/fahimfaisaal
- Twitter: @FahimFaisaal
This project is licensed under the MIT License - see the LICENSE file for details.