8000 Inbox consumer breaking changes. Now inbox consumer uses same scope a… by HaikAsatryan · Pull Request #14 · PandaTechAM/be-lib-masstransit-postgres-outbox · GitHub
[go: up one dir, main page]

Skip to content

Inbox consumer breaking changes. Now inbox consumer uses same scope a… #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2">
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.2">
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
using MassTransit.PostgresOutbox.Abstractions;
using MassTransit.PostgresOutbox.Demo.Consumer.Contexts;
using MassTransit.PostgresOutbox.Demo.Shared.Events;
using Microsoft.EntityFrameworkCore.Storage;

namespace MassTransit.PostgresOutbox.Demo.Consumer.Services;

public class ConsumeService(ConsumerContext dbContext, IServiceScopeFactory serviceScopeFactory)
: InboxConsumer<ComplexObjectEvent, ConsumerContext>(serviceScopeFactory)
public class ConsumeService(ConsumerContext dbContext, IServiceProvider sp)
: InboxConsumer<ComplexObjectEvent, ConsumerContext>(sp)
{
protected override Task Consume(ComplexObjectEvent message)
protected override Task Consume(ComplexObjectEvent message, IDbContextTransaction dbContextTransaction)
{
var original = ComplexObjectEvent.Init();
var match = message.Equals(original);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.6" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.4.0" />
<ProjectReference Include="..\src\MassTransit.PostgresOutbox\MassTransit.PostgresOutbox.csproj" />
</ItemGroup>

Expand Down
6 changes: 3 additions & 3 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ public class YourConsumer : InboxConsumer<YourMessage, PostgresContext>
{
private readonly PostgresContext _context;

public YourConsumer(PostgresContext dbContext, IServiceScopeFactory serviceScopeFactory)
: base(serviceScopeFactory)
public YourConsumer(PostgresContext dbContext, IServiceProvider sp)
: base(sp)
{
_context = dbContext;
}

public override async Task Consume(YourMessage message)
public override async Task Consume(YourMessage message, IDbContextTransaction transaction)
{
// Implement your message processing logic here
}
Expand Down
37 changes: 20 additions & 17 deletions src/MassTransit.PostgresOutbox/Abstractions/InboxConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using MassTransit.PostgresOutbox.Entities;
using MassTransit.PostgresOutbox.Enums;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

Expand All @@ -14,23 +15,20 @@ public abstract class InboxConsumer<TMessage, TDbContext> : IConsumer<TMessage>
where TDbContext : DbContext, IInboxDbContext
{
private readonly string _consumerId;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IServiceProvider _sp;

protected InboxConsumer(IServiceScopeFactory serviceScopeFactory)
protected InboxConsumer(IServiceProvider sp)
{
_consumerId = GetType()
.ToString();
_serviceScopeFactory = serviceScopeFactory;
_sp = sp;
}

public async Task Consume(ConsumeContext<TMessage> context)
{
var messageId = context.Headers.Get<Guid>(Constants.OutboxMessageId) ?? context.MessageId;

using var scope = _serviceScopeFactory.CreateScope();

var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<InboxConsumer<TMessage, TDbContext>>>();
var dbContext = _sp.GetRequiredService<TDbContext>();
var logger = _sp.GetRequiredService<ILogger<InboxConsumer<TMessage, TDbContext>>>();

var exists =
await dbContext.InboxMessages.AnyAsync(x => x.MessageId == messageId && x.ConsumerId == _consumerId);
Expand All @@ -57,29 +55,34 @@ public async Task Consume(ConsumeContext<TMessage> context)
.Where(x => x.State == MessageState.New)
.ForUpdate(LockBehavior.SkipLocked)
.FirstOrDefaultAsync();

if (inboxMessage == null)
{
return;
}

try
{
await Consume(context.Message);
await Consume(context.Message, transactionScope);

inboxMessage.State = MessageState.Done;
inboxMessage.UpdatedAt = DateTime.UtcNow;

await dbContext.SaveChangesAsync();
await transactionScope.CommitAsync();
}
catch (Exception ex)
{
logger.LogError(ex, "Exception thrown while consuming message");
throw;
}
finally
{
logger.LogError(ex, "Exception thrown while consuming message {messageId} by {consumerId}",
messageId,
_consumerId);

await transactionScope.RollbackAsync();

inboxMessage.UpdatedAt = DateTime.UtcNow;
await dbContext.SaveChangesAsync();
await transactionScope.CommitAsync();
throw;
}
}

protected abstract Task Consume(TMessage message);
protected abstract Task Consume(TMessage message, IDbContextTransaction transactionScope);
}
12 changes: 6 additions & 6 deletions src/MassTransit.PostgresOutbox/MassTransit.PostgresOutbox.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
<PackageReadmeFile>Readme.md</PackageReadmeFile>
<Authors>Pandatech</Authors>
<Copyright>MIT</Copyright>
<Version>2.0.3</Version>
<Version>3.0.0</Version>
<PackageId>Pandatech.MassTransit.PostgresOutbox</PackageId>
<Title>Pandatech MassTransit PostgreSQL Outbox Extension</Title>
<PackageTags>Pandatech, library, postgres, distributed systems, microservices, modular monolith, messaging, efcore, mass transit, outbox pattern, inbox pattern</PackageTags>
<Description>Pandatech.MassTransit.PostgresOutbox extends MassTransit to offer advanced message handling capabilities for distributed systems. With first-class support for multiple DbContexts, this library integrates seamlessly with Entity Framework Core and PostgreSQL, providing reliable Outbox and Inbox patterns. It ensures consistent message delivery and processing in complex microservices architectures, leveraging PostgreSQL's ForUpdate feature to handle concurrency with ease.</Description>
<RepositoryUrl>https://github.com/PandaTechAM/be-lib-pandatech-masstransit-postgres-outbox</RepositoryUrl>
<PackageReleaseNotes>Nuget updates</PackageReleaseNotes>
<PackageReleaseNotes>Inbox consumer breaking changes. Now inbox consumer uses same scope and works within db transaction to make inbox maximum atomic,</PackageReleaseNotes>
</PropertyGroup>

<ItemGroup>
Expand All @@ -23,10 +23,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MassTransit" Version="8.3.6" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
<PackageReference Include="Pandatech.EFCore.PostgresExtensions" Version="4.0.1" />
<PackageReference Include="MassTransit" Version="8.4.0" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
<PackageReference Include="Pandatech.EFCore.PostgresExtensions" Version="5.1.0" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2">
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.2">
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="8.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
0