+
Skip to content

tz01x/async_q

Repository files navigation

AsyncQ

A simple worker task queue with async support

Installation

pip install async_q

Features

  1. Submit asynchronously task
  2. Task routing
  3. Distributed async worker process

Use Case: Asynchronous Task Queue for I/O-Bound Task Processing

Overview

In scenarios where you need to execute I/O-bound tasks asynchronously within a single application, the "async_q" library provides a straightforward solution. This use case illustrates how to create and utilize the library for such purposes.

Prerequisites

Before using the "async_q" library, ensure you have Python 3.8 and Redis (Version 5.0 to current) installed, along with the required dependencies.

Setting Up the Application

1. Creating the Async Task Queue App

Begin by creating an instance of the AsyncTaskQueue to set up your application. In this example, we will name our app "async_q_app" and configure it to use Redis as the message broker.

# main_app.py

from async_q import AsyncTaskQueue, RedisBuilder

# Define Async Task Queue App
async_q_app = AsyncTaskQueue(
    redis_builder=RedisBuilder(
        port='6379',
    )
)

2. Defining the Task Function

Next, define a task function that will be submitted to the queue for processing. In this example, we have a task function named my_task. This function simulates I/O waiting with a specified delay.

# my_task.py

import logging
import asyncio
from async_q import submit_task

# For initializing the app
from main_app import async_q_app

async def my_task(idx, delay=2, *args, **kwargs):
    # Simulate I/O waiting
    await asyncio.sleep(delay)

    logging.info(f'{idx} has finished the task. Task ID: {kwargs.get("task_id")}')

3. Submitting Tasks

To submit tasks for processing, you can use the submit_task function. In this example, we submit 20 tasks to be processed by the queue.

if __name__ == '__main__':
    for i in range(20):
        submit_task(my_task, kwargs={'idx': i, 'delay': 10})

In the previous example, the submit_task function also includes a queue argument with the default value set to default . This offers flexibility in case you want to specify a different queue for a particular task, but if no queue is provided, it will use the default value of default.

if __name__ == '__main__':
     for i in range(20):
         # Submit a task with an explicit queue name
         submit_task(my_task, kwargs={'idx': i, 'delay': 10}, queue_name='default')
         # You can control retries by setting defaults in your task payload later (max_retries, backoff_* fields)

Running Worker Processes

4. Initializing Worker Processes

In order to efficiently handle incoming tasks, it's crucial to set up and launch worker processes. The level of concurrency can be precisely determined by employing the -c flag. You can now provide the app in multiple ways:

  • File path form (original):
python -m async_q -a main_app.py:async_q_app -c 5
  • Dotted module path (portable across machines):
python -m async_q -a main_app:async_q_app -c 5
  • Environment variable (omit -a):
# Bash
export ASYNC_Q_APP=main_app:async_q_app
python -m async_q -c 5

# Windows PowerShell
$env:ASYNC_Q_APP='main_app:async_q_app'
python -m async_q -c 5

By default, these workers will process tasks from the default queue. You can assign specific workers to other queues as needed:

python -m async_q -a main_app:async_q_app -c 5 -q mail_ps

By making these adjustments, you can tailor the behavior of your worker processes to suit your application's specific requirements.

5. Submitting Tasks for Processing

With the worker processes running, you can now submit tasks for processing. Use the following command to execute the my_task.py script, which submits tasks to the queue.

$ python my_task.py

Result

The worker processes will asynchronously process the submitted tasks with the specified delays. You can monitor the progress and completion of tasks through log messages generated by the my_task function. The "Async Queue" library is suitable for I/O-bound workloads that benefit from asynchronous processing within a single application.

Todo

  1. Test and check back compatibility

About

A simple worker task queue with async support

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载