8000 Handle SetStreamMetadata idempotently by damianh · Pull Request #129 · SQLStreamStore/SQLStreamStore · GitHub
[go: up one dir, main page]

Skip to content
This repository was archived by the owner on Aug 15, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,26 @@ await store
}
}
}


[Fact, Trait("Category", "StreamMetadata")]
public async Task When_set_metadata_with_same_data_then_should_handle_idempotently()
{
using (var fixture = GetFixture())
{
using (var store = await fixture.GetStreamStore())
{
const string streamId = "stream-1";
await store
.SetStreamMetadata(streamId, maxCount: 2, maxAge: 30, metadataJson: "meta");
await store
.SetStreamMetadata(streamId, maxCount: 2, maxAge: 30, metadataJson: "meta");

var metadata = await store.GetStreamMetadata(streamId);

metadata.MetadataStreamVersion.ShouldBe(0);
8000 Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary test. StreamVersion should not have been incremented and stay at 0.

}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
<ItemGroup>
<Compile Include="..\SqlStreamStore.AcceptanceTests\*.cs" />
</ItemGroup>
<ItemGroup>
<None Remove="SqlStreamStore.MsSql.Tests.v3.ncrunchproject" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\SqlStreamStore.MsSql\SqlStre 8000 amStore.MsSql.csproj" />
</ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions src/SqlStreamStore.MsSql/MsSqlStreamStore.AppendStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private async Task<MsSqlAppendResult> AppendToStreamExpectedVersionAny(
false,
null,
connection,
transaction,
cancellationToken)
.NotOnCapturedContext();

Expand Down Expand Up @@ -288,6 +289,7 @@ private async Task<MsSqlAppendResult> AppendToStreamExpectedVersionNoStream(
false,
null,
connection,
transaction,
cancellationToken)
.NotOnCapturedContext();

Expand Down Expand Up @@ -387,6 +389,7 @@ private async Task<MsSqlAppendResult> AppendToStreamExpectedVersion(
false,
null,
connection,
transaction,
cancellationToken);

if(messages.Length > page.Messages.Length)
Expand Down
10 changes: 6 additions & 4 deletions src/SqlStreamStore.MsSql/MsSqlStreamStore.ReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected override async Task<ReadStreamPage> ReadStreamForwardsInternal(
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
var streamIdInfo = new StreamIdInfo(streamId);
return await ReadStreamInternal(streamIdInfo.SqlStreamId, start, count, ReadDirection.Forward,
prefetch, readNext, connection, cancellationToken);
prefetch, readNext, connection, null, cancellationToken);
}
}

Expand All @@ -41,7 +41,7 @@ protected override async Task<ReadStreamPage> ReadStreamBackwardsInternal(
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
var streamIdInfo = new StreamIdInfo(streamId);
return await ReadStreamInternal(streamIdInfo.SqlStreamId, start, count, ReadDirection.Backward,
prefetch, readNext, connection, cancellationToken);
prefetch, readNext, connection, null, cancellationToken);
}
}

Expand All @@ -52,7 +52,9 @@ private async Task<ReadStreamPage> ReadStreamInternal(
ReadDirection direction,
bool prefetch,
ReadNextStreamPage readNext,
SqlConnection connection, CancellationToken cancellationToken)
SqlConnection connection,
SqlTransaction transaction,
CancellationToken cancellationToken)
{
// If the count is int.MaxValue, TSql will see it as a negative number.
// Users shouldn't be using int.MaxValue in the first place anyway.
Expand Down Expand Up @@ -87,7 +89,7 @@ private async Task<ReadStreamPage> ReadStreamInternal(
};
}

using(var command = new SqlCommand(commandText, connection))
using (var command = new SqlCommand(commandText, connection, transaction))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to pass this down because command wouldn't work if in a transaction scope. In other paths, this is null, which is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not completely following - are you saying that particular overload needs to be called even if transaction is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this transcation

using(var transaction = connection.BeginTransaction())
was getting associated with the connection, if I didn't pass it to this sql command I was getting sql exception.

System.InvalidOperationException: BeginExecuteReader requires the command to have a transaction when the connection assigned to the command is in a pending local transaction.  The Transaction property of the command has not been initialized.
   at System.Data.SqlClient.SqlCommand.ValidateCommand(Boolean async, String method)
   at System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, TaskCompletionSource`1 completion, Int32 timeout, Task& task, Boolean asyncWrite, String method)
   at System.Data.SqlClient.SqlCommand.BeginExecuteReader(CommandBehavior behavior, AsyncCallback callback, Object stateObject)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncImpl[TArg1](Func`4 beginMethod, Func`2 endFunction, Action`1 endAction, TArg1 arg1, Object state, TaskCreationOptions creationOptions)
   at System.Threading.Tasks.TaskFactory`1.FromAsync[TArg1](Func`4 beginMethod, Func`2 endMethod, TArg1 arg1, Object state)
   at System.Data.SqlClient.SqlCommand.ExecuteReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)

{
command.Parameters.AddWithValue("streamId", sqlStreamId.Id);
command.Parameters.AddWithValue("count", count + 1); //Read extra row to see if at end or not
Expand Down
7 changes: 4 additions & 3 deletions src/SqlStreamStore.MsSql/MsSqlStreamStore.StreamMetadata.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace SqlStreamStore
{
using System;
using System.Threading;
using System.Threading.Tasks;
using SqlStreamStore.Streams;
Expand All @@ -27,6 +26,7 @@ protected override async Task<StreamMetadataResult> GetStreamMetadataInternal(
true,
null,
connection,
null,
cancellationToken);
}

Expand Down Expand Up @@ -70,14 +70,15 @@ protected override async Task<SetStreamMetadataResult> SetStreamMetadataInternal
MetaJson = metadataJson
};
var json = SimpleJson.SerializeObject(metadataMessage);
var newmessage = new NewStreamMessage(Guid.NewGuid(), "$stream-metadata", json);
var messageId = MetadataMessageIdGenerator.Create(json);
var message = new NewStreamMessage(messageId, "$stream-metadata", json);

result = await AppendToStreamInternal(
connection,
transaction,
streamIdInfo.MetadataSqlStreamId,
expectedStreamMetadataVersion,
new[] { newmessage },
new[] { message },
cancellationToken);

transaction.Commit();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace SqlStreamStore.Infrastructure
{
using System;
using Shouldly;
using Xunit;

public class DeterministicGuidGeneratorTests
{
[Fact]
public void Given_same_input_should_generate_same_Guid()
{
var generator = new DeterministicGuidGenerator(Guid.NewGuid());
var guid1 = generator.Create("some-data");
var guid2 = generator.Create("some-data");

guid2.ShouldBe(guid1);
}

[Fact]
public void Given_different_input_should_generate_different_Guid()
{
var generator = new DeterministicGuidGenerator(Guid.NewGuid());
var guid1 = generator.Create("some-data");
var guid2 = generator.Create("other-data");

guid2.ShouldNotBe(guid1);
}
}
}
5 changes: 3 additions & 2 deletions src/SqlStreamStore/InMemory/InMemoryStreamStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ protected override async Task<SetStreamMetadataResult> SetStreamMetadataInternal
MetaJson = metadataJson
};
var json = SimpleJson.SerializeObject(metadataMessage);
var newmessage = new NewStreamMessage(Guid.NewGuid(), "$stream-metadata", json);
var messageId = MetadataMessageIdGenerator.Create(json);
var newStreamMessage = new NewStreamMessage(messageId, "$stream-metadata", json);

var result = AppendToStreamInternal(metaStreamId, expectedStreamMetadataVersion, new[] { newmessage });
var result = AppendToStreamInternal(metaStreamId, expectedStreamMetadataVersion, new[] { newStreamMessage });

await CheckStreamMaxCount(streamId, metadataMessage.MaxCount, cancellationToken);

Expand Down
92 changes: 92 additions & 0 deletions src/SqlStreamStore/Infrastructure/DeterministicGuidGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
namespace SqlStreamStore.Infrastructure
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;

// Adapted from https://github.com/LogosBible/Logos.Utility/blob/master/src/Logos.Utility/GuidUtility.cs
// MIT Licence

/// <summary>
/// A helper utility to generate deterministed GUIDS.
/// </summary>
public class DeterministicGuidGenerator
{
private readonly byte[] _namespaceBytes;

/// <summary>
/// Initializes a new instance of <see cref="DeterministicGuidGenerator"/>
/// </summary>
/// <param name="guidNameSpace">
/// A namespace that ensures that the GUID generated with this instance
/// do not collided with other generators. Your application should define
/// it's namespace as a constant.
/// </param>
public DeterministicGuidGenerator(Guid guidNameSpace)
{
_namespaceBytes = guidNameSpace.ToByteArray();
SwapByteOrder(_namespaceBytes);
}

/// <summary>
/// Creates a deterministic GUID.
/// </summary>
/// <param name="source">
/// A source to generate the GUID from.
/// </param>
/// <returns>
/// A deterministically generated GUID.
/// </returns>
public Guid Create(string source)
{
return Create(Encoding.UTF8.GetBytes(source));
}

/// <summary>
/// Creates a deterministic GUID.
/// </summary>
/// <param name="source">
/// A source to generate the GUID from.
/// </param>
/// <returns>
/// A deterministically generated GUID.
/// </returns>
public Guid Create(byte[] source)
{
byte[] hash;
using (var algorithm = SHA1.Create())
{
algorithm.TransformBlock(_namespaceBytes, 0, _namespaceBytes.Length, null, 0);
algorithm.TransformFinalBlock(source, 0, source.Length);

hash = algorithm.Hash;
}

var newGuid = new byte[16];
Array.Copy(hash, 0, newGuid, 0, 16);

newGuid[6] = (byte)((newGuid[6] & 0x0F) | (5 << 4));
newGuid[8] = (byte)((newGuid[8] & 0x3F) | 0x80);

SwapByteOrder(newGuid);
return new Guid(newGuid);
}

private static void SwapByteOrder(byte[] guid)
{
SwapBytes(guid, 0, 3);
SwapBytes(guid, 1, 2);
SwapBytes(guid, 4, 5);
SwapBytes(guid, 6, 7);
}

private static void SwapBytes(byte[] guid, int left, int right)
{
var temp = guid[left];
guid[left] = guid[right];
guid[right] = temp;
}
}
}
32 changes: 32 additions & 0 deletions src/SqlStreamStore/Infrastructure/MetadataMessageIdGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace SqlStreamStore.Infrastructure
{
using System;

/// <summary>
/// A deterministic GUID generator for metadata messages.
/// </summary>
public static class MetadataMessageIdGenerator
{
private static readonly DeterministicGuidGenerator s_deterministicGuidGenerator;

static MetadataMessageIdGenerator()
{
s_deterministicGuidGenerator
= new DeterministicGuidGenerator(Guid.Parse("8D1E0B02-0D78-408E-8211-F899BE6F8AA2"));
}

/// <summary> 47C2 ;
/// Create a GUID for metadata message Ids.
/// </summary>
/// <param name="message">
/// The metadata message uses as input into the generation algorithim.
/// </param>
/// <returns>
/// A deterministically generated GUID.
/// </returns>
public static Guid Create(string message)
{
return s_deterministicGuidGenerator.Create(message);
}
}
}
0