[go: up one dir, main page]

Skip to content

pkonnov/aiopq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Example:

import asyncio
import datetime
import logging
from typing import Optional

import asyncpg
import orjson
from aiohttp import web

from aiopq import AioPQ

logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.DEBUG,
)


class TestQueue:

    queue: Optional[AioPQ] = None

    @classmethod
    async def create_test_queue(cls):

        if cls.queue:
            return

        pool = await asyncpg.create_pool(
            dsn="postgres://user:123456@localhost:5432/aiopq?application_name=myapp",
            max_size=3,
            min_size=1,
        )
        aiopq_init = AioPQ(pool)
        await aiopq_init.create()
        cls.queue = aiopq_init


async def do_something_with_task_lol():
    async for task in TestQueue.queue["lol"]:
        if task:
            # do something useful
            pass
        else:
            continue


async def do_something_with_task_test():
    async for task in TestQueue.queue["test"]:
        if task:
            # do something useful
            pass
        else:
            continue


async def create_task(request):
    data = await request.json()
    scheduled_at = datetime.datetime.now(
        tz=datetime.timezone(datetime.timedelta(hours=3))
    ) + datetime.timedelta(minutes=1)
    task_queue = TestQueue.queue[data["queue_name"]]
    task = await task_queue.put(data["task_name"], data, scheduled_at)
    return web.Response(
        body=orjson.dumps({"task_id": task}),
        headers={"Content-Type": "applications/json"},
    )


async def run():
    app = web.Application()
    app.add_routes([web.post("/create-task", create_task)])

    await TestQueue.create_test_queue()

    asyncio.create_task(do_something_with_task_test())
    asyncio.create_task(do_something_with_task_lol())

    return app


if __name__ == "__main__":
    logger.debug("run")
    web.run_app(run(), port=9000)

POST request to http://localhost:9000/create-task with json object:

{
  "task_name": "some_task_name",
  "queue_name": "lol"
}

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages