import asyncio
import datetime
import logging
from typing import Optional
import asyncpg
import orjson
from aiohttp import web
from aiopq import AioPQ
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG,
)
class TestQueue:
queue: Optional[AioPQ] = None
@classmethod
async def create_test_queue(cls):
if cls.queue:
return
pool = await asyncpg.create_pool(
dsn="postgres://user:123456@localhost:5432/aiopq?application_name=myapp",
max_size=3,
min_size=1,
)
aiopq_init = AioPQ(pool)
await aiopq_init.create()
cls.queue = aiopq_init
async def do_something_with_task_lol():
async for task in TestQueue.queue["lol"]:
if task:
# do something useful
pass
else:
continue
async def do_something_with_task_test():
async for task in TestQueue.queue["test"]:
if task:
# do something useful
pass
else:
continue
async def create_task(request):
data = await request.json()
scheduled_at = datetime.datetime.now(
tz=datetime.timezone(datetime.timedelta(hours=3))
) + datetime.timedelta(minutes=1)
task_queue = TestQueue.queue[data["queue_name"]]
task = await task_queue.put(data["task_name"], data, scheduled_at)
return web.Response(
body=orjson.dumps({"task_id": task}),
headers={"Content-Type": "applications/json"},
)
async def run():
app = web.Application()
app.add_routes([web.post("/create-task", create_task)])
await TestQueue.create_test_queue()
asyncio.create_task(do_something_with_task_test())
asyncio.create_task(do_something_with_task_lol())
return app
if __name__ == "__main__":
logger.debug("run")
web.run_app(run(), port=9000)
POST request to http://localhost:9000/create-task with json object:
{
"task_name": "some_task_name",
"queue_name": "lol"
}