From 9439f9edd32cb7bc467b8db4973fbb71e73c17ff Mon Sep 17 00:00:00 2001 From: yreynhout Date: Thu, 2 Jul 2020 16:24:19 +0200 Subject: [PATCH] Started work on reading the head message of a particular stream or the all stream. --- src/SqlStreamStore/IReadonlyStreamStore.cs | 26 +++++++- .../InMemory/InMemoryStreamStore.cs | 29 +++++++++ .../Infrastructure/ReadonlyStreamStoreBase.cs | 18 ++++++ .../AcceptanceTests.ReadHeadCheckpoint.cs | 60 +++++++++++++++++++ 4 files changed, 132 insertions(+), 1 deletion(-) diff --git a/src/SqlStreamStore/IReadonlyStreamStore.cs b/src/SqlStreamStore/IReadonlyStreamStore.cs index 5039c022a..531766d3b 100644 --- a/src/SqlStreamStore/IReadonlyStreamStore.cs +++ b/src/SqlStreamStore/IReadonlyStreamStore.cs @@ -210,9 +210,19 @@ IAllStreamSubscription SubscribeToAll( /// The cancellation instruction. /// /// - /// The head position. + /// The head position, or -1 if there is none. /// Task ReadHeadPosition(CancellationToken cancellationToken = default); + /// + /// Reads the head message (the very latest message). + /// + /// + /// The cancellation instruction. + /// + /// + /// The head message, or null if there is no head message. + /// + Task ReadHeadMessage(CancellationToken cancellationToken = default); /// /// Reads the head position (the position of the very latest message) of a particular stream. @@ -242,6 +252,20 @@ IAllStreamSubscription SubscribeToAll( /// Task ReadStreamHeadVersion(StreamId streamId, CancellationToken cancellationToken = default); + /// + /// Reads the head message (the very latest message) of a particular stream. + /// + /// + /// The stream to read the head message of. + /// + /// + /// The cancellation instruction. + /// + /// + /// The head message of the stream, or null if no such stream or an empty stream. + /// + Task ReadStreamHeadMessage(StreamId streamId, CancellationToken cancellationToken = default); + /// /// Gets the stream metadata. /// diff --git a/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs b/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs index aa9fd88ed..9f2307b6c 100644 --- a/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs +++ b/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs @@ -713,6 +713,35 @@ protected override Task ReadStreamHeadVersionInternal(string streamId, Canc } } + protected override Task ReadHeadMessageInternal(CancellationToken cancellationToken) + { + var message = _allStream.LastOrDefault(); + return message == null + ? Task.FromResult(default) + : Task.FromResult( + new StreamMessage( + message.StreamId, message.MessageId, + message.StreamVersion, message.Position, + message.Created, message.Type, + message.JsonMetadata, message.JsonData)); + } + + protected override Task ReadStreamHeadMessageInternal(string streamId, CancellationToken cancellationToken) + { + using(_lock.UseReadLock()) + { + if(!_streams.TryGetValue(streamId, out InMemoryStream stream)) + return Task.FromResult(default); + var message = stream.Messages[stream.Messages.Count - 1]; + return Task.FromResult( + new StreamMessage( + message.StreamId, message.MessageId, + message.StreamVersion, message.Position, + message.Created, message.Type, + message.JsonMetadata, message.JsonData)); + } + } + protected override IAllStreamSubscription SubscribeToAllInternal( long? fromPosition, AllStreamMessageReceived streamMessageReceived, diff --git a/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs b/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs index 3d780fe02..7a03a55b8 100644 --- a/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs +++ b/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs @@ -240,6 +240,22 @@ public Task ReadStreamHeadPosition(StreamId streamId, CancellationToken ca return ReadStreamHeadPositionInternal(streamId, cancellationToken); } + public Task ReadHeadMessage(CancellationToken cancellationToken = default) + { + GuardAgainstDisposed(); + + return ReadHeadMessageInternal(cancellationToken); + } + + public Task ReadStreamHeadMessage(StreamId streamId, CancellationToken cancellationToken = default) + { + if (streamId == null) throw new ArgumentNullException(nameof(streamId)); + + GuardAgainstDisposed(); + + return ReadStreamHeadMessageInternal(streamId, cancellationToken); + } + public Task ReadStreamHeadVersion(StreamId streamId, CancellationToken cancellationToken = default) { if (streamId == null) throw new ArgumentNullException(nameof(streamId)); @@ -318,6 +334,8 @@ protected abstract Task ReadStreamBackwardsInternal( protected abstract Task ReadHeadPositionInternal(CancellationToken cancellationToken); protected abstract Task ReadStreamHeadPositionInternal(string streamId, CancellationToken cancellationToken); protected abstract Task ReadStreamHeadVersionInternal(string streamId, CancellationToken cancellationToken); + protected abstract Task ReadHeadMessageInternal(CancellationToken cancellationToken); + protected abstract Task ReadStreamHeadMessageInternal(string streamId, CancellationToken cancellationToken); protected abstract IStreamSubscription SubscribeToStreamInternal( string streamId, diff --git a/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs b/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs index 5481de59e..c598bea34 100644 --- a/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs +++ b/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs @@ -94,5 +94,65 @@ public async Task Given_filled_stream_when_get_head_version_then_returns_expecte head.ShouldBe(2); } + + [Fact] + public async Task Given_empty_store_when_get_head_message_Then_should_be_minus_one() + { + var head = await Store.ReadHeadMessage(); + + head.ShouldBe(default); + } + + [Fact] + public async Task Given_store_with_empty_stream_when_get_head_message_Then_should_be_minus_one() + { + await Store.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamMessages()); + + var head = await Store.ReadHeadMessage(); + + head.ShouldBe(default); + } + + [Fact] + public async Task Given_store_with_messages_then_can_get_head_message() + { + var messages = CreateNewStreamMessages(1, 2, 3); + await Store.AppendToStream("stream-1", ExpectedVersion.NoStream, messages); + + var head = await Store.ReadHeadMessage(); + + head.ShouldBe(messages[3]); + } + + [Fact] + public async Task Given_no_stream_when_get_stream_head_message_then_returns_expected_result() + { + await Store.AppendToStream("other-stream", ExpectedVersion.NoStream, CreateNewStreamMessages(1)); + + var head = await Store.ReadStreamHeadMessage("this-stream"); + + head.ShouldBe(default); + } + + [Fact] + public async Task Given_empty_stream_when_get_stream_head_message_then_returns_expected_result() + { + await Store.AppendToStream("this-stream", ExpectedVersion.NoStream, CreateNewStreamMessages()); + + var head = await Store.ReadStreamHeadMessage("this-stream"); + + head.ShouldBe(default); + } + + [Fact] + public async Task Given_filled_stream_when_get_stream_head_message_then_returns_expected_result() + { + var messages = CreateNewStreamMessages(1, 2, 3); + await Store.AppendToStream("this-stream", ExpectedVersion.NoStream, messages); + + var head = await Store.ReadStreamHeadMessage("this-stream"); + + head.ShouldBe(messages[3]); + } } } \ No newline at end of file