+
Skip to content

goptics/redisq

Repository files navigation

Redisq

Go Reference Go Report Card Go Version CI codecov

A lightweight, thread-safe Redis-backed queue implementation in Go with support for distributed notifications and priority queuing.

Features

  • Thread-safe queue operations
  • Priority queue support
  • Distributed queue with real-time notifications
  • Automatic message expiration support
  • Graceful shutdown handling
  • Concurrent operations support
  • Simple API for queue operations

Installation

go get github.com/goptics/redisq

Quick Start

import "github.com/goptics/redisq"

func main() {
    // Initialize queue system
    qs := redisq.New("redis://localhost:6379")
    defer qs.Close()

    // Create a simple queue
    queue := qs.NewQueue("my-queue")
    defer queue.Close()

    // Enqueue items
    queue.Enqueue("hello world")

    // Dequeue items
    data, ok := queue.Dequeue()
    if ok {
        fmt.Println(string(data.([]byte)))
    }
}

Usage

Simple Queue

// Create a queue
queue := qs.NewQueue("my-queue")

// Optional: Set expiration for queue items
queue.SetExpiration(time.Hour)

// Enqueue items (supports []byte and string)
queue.Enqueue("test message")
queue.Enqueue([]byte("binary data"))

// Get queue length
length := queue.Len()

// Get all values
values := queue.Values()

// Purge the queue
queue.Purge()

Priority Queue

// Create a priority queue
pq := qs.NewPriorityQueue("priority-queue")

// Enqueue items with priority (lower number = higher priority)
pq.Enqueue("high priority", 1)
pq.Enqueue("medium priority", 2)
pq.Enqueue("low priority", 3)

// Items will be dequeued in priority order
data, ok := pq.Dequeue() // Returns "high priority"

Distributed Priority Queue

// Create a distributed priority queue
dpq := qs.NewDistributedPriorityQueue("distributed-priority-queue")

// Subscribe to notifications for queue and dequeue events
dpq.Subscribe(func(action string) {
    fmt.Printf("Action: %s\n", action)
})


// Enqueue with priority will trigger "enqueued" notification
dpq.Enqueue("important message", 1)

// Dequeue will trigger "dequeued" notification
data, ok := dpq.Dequeue()

Distributed Queue with Notifications

// Create a distributed queue
dq := qs.NewDistributedQueue("distributed-queue")

// Subscribe to notifications for queue and dequeue events
dq.Subscribe(func(action string) {
    fmt.Printf("Action: %s\n", action)
})

// Enqueue will trigger "enqueued" notification
dq.Enqueue("test message")

// Dequeue will trigger "dequeued" notification
data, ok := dq.Dequeue()

Queue with Acknowledgment (Reliable Processing)

// Create a queue with acknowledgment support
queue := qs.NewQueue("ack-queue")

// Set acknowledgment timeout (how long before unacknowledged items are requeued)
queue.SetAckTimeout(time.Minute * 5)

// Dequeue an item with a unique acknowledgment ID
ackID := "job-123"
item, ok := queue.Dequeue()
if ok {
    // Process the item
    processItem(item)

    // Mark the item as successfully processed
    queue.Acknowledge(ackID)
}

// For manual control of the acknowledgment process:
// 1. Prepare an item for future acknowledgment
ackID := "job-456"
data := "important job"
err := queue.PrepareForFutureAck(ackID, data)

// 2. Acknowledge the item after processing
success := queue.Acknowledge(ackID)

// You can trigger requeue of all unacknowledged items
queue.RequeueNackedItems()

// 4. Get count of pending unacknowledged items
pendingCount := queue.GetNackedItemsCount()

Configuration

If you have docker installed just do the following:

cp .env.example .env
docker compose up -d

you can change the REDIS_PORT in the .env file

Testing

go test -race -v ./...

Requirements

  • Go 1.24.1 or higher
  • Redis 6.0 or higher

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

👤 Author

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

A redis based queue written in go. supports distributions

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载