[go: up one dir, main page]

Skip to content

Commit

Permalink
✨ feat: add RequeueMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
mic1on committed Mar 26, 2024
1 parent e6564d1 commit 8ce3266
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "onestep"
version = "0.3.8"
version = "0.3.81"
description = ""
authors = ["miclon <jcnd@163.com>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/onestep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@
'__version__'
]

__version__ = '0.3.8'
__version__ = '0.3.81'
8 changes: 8 additions & 0 deletions src/onestep/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ class RetryInLocal(RetryException):

class DropMessage(Exception):
"""从 Brokers 中 丢弃该消息"""


class RejectMessage(DropMessage):
"""从 Brokers 中 丢弃该消息"""


class RequeueMessage(Exception):
"""重新入队该消息"""
1 change: 1 addition & 0 deletions src/onestep/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
message_consumed = signal("message_consumed")
message_error = signal("message_error")
message_drop = signal("message_drop")
message_requeue = signal("message_requeue")
10 changes: 7 additions & 3 deletions src/onestep/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from .message import Message
from .retry import RetryStatus
from .broker import BaseBroker
from .exception import DropMessage
from .signal import message_received, message_consumed, message_error, message_drop
from . import exception
from .signal import message_received, message_consumed, message_error, message_drop, message_requeue

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,10 +82,14 @@ def handle_message(self, message: Message):
message.confirm()

self.instance.after_emit("consume", message=message)
except DropMessage as e:
except (exception.DropMessage, exception.RejectMessage) as e:
message_drop.send(self, message=message, reason=e)
logger.warning(f"{self.instance.name} dropped <{type(e).__name__}: {str(e)}>")
message.reject()
except exception.RequeueMessage as e:
message_requeue.send(self, message=message, reason=e)
logger.warning(f"{self.instance.name} requeue <{type(e).__name__}: {str(e)}>")
message.requeue(is_source=True)
except Exception as e:
message_error.send(self, message=message, error=e)
if self.instance.state.debug:
Expand Down

0 comments on commit 8ce3266

Please sign in to comment.