diff --git a/src/SqlStreamStore/IReadonlyStreamStore.cs b/src/SqlStreamStore/IReadonlyStreamStore.cs
index 5039c022a..531766d3b 100644
--- a/src/SqlStreamStore/IReadonlyStreamStore.cs
+++ b/src/SqlStreamStore/IReadonlyStreamStore.cs
@@ -210,9 +210,19 @@ IAllStreamSubscription SubscribeToAll(
/// The cancellation instruction.
///
///
- /// The head position.
+ /// The head position, or -1 if there is none.
///
Task ReadHeadPosition(CancellationToken cancellationToken = default);
+ ///
+ /// Reads the head message (the very latest message).
+ ///
+ ///
+ /// The cancellation instruction.
+ ///
+ ///
+ /// The head message, or null if there is no head message.
+ ///
+ Task ReadHeadMessage(CancellationToken cancellationToken = default);
///
/// Reads the head position (the position of the very latest message) of a particular stream.
@@ -242,6 +252,20 @@ IAllStreamSubscription SubscribeToAll(
///
Task ReadStreamHeadVersion(StreamId streamId, CancellationToken cancellationToken = default);
+ ///
+ /// Reads the head message (the very latest message) of a particular stream.
+ ///
+ ///
+ /// The stream to read the head message of.
+ ///
+ ///
+ /// The cancellation instruction.
+ ///
+ ///
+ /// The head message of the stream, or null if no such stream or an empty stream.
+ ///
+ Task ReadStreamHeadMessage(StreamId streamId, CancellationToken cancellationToken = default);
+
///
/// Gets the stream metadata.
///
diff --git a/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs b/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs
index aa9fd88ed..9f2307b6c 100644
--- a/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs
+++ b/src/SqlStreamStore/InMemory/InMemoryStreamStore.cs
@@ -713,6 +713,35 @@ protected override Task ReadStreamHeadVersionInternal(string streamId, Canc
}
}
+ protected override Task ReadHeadMessageInternal(CancellationToken cancellationToken)
+ {
+ var message = _allStream.LastOrDefault();
+ return message == null
+ ? Task.FromResult(default)
+ : Task.FromResult(
+ new StreamMessage(
+ message.StreamId, message.MessageId,
+ message.StreamVersion, message.Position,
+ message.Created, message.Type,
+ message.JsonMetadata, message.JsonData));
+ }
+
+ protected override Task ReadStreamHeadMessageInternal(string streamId, CancellationToken cancellationToken)
+ {
+ using(_lock.UseReadLock())
+ {
+ if(!_streams.TryGetValue(streamId, out InMemoryStream stream))
+ return Task.FromResult(default);
+ var message = stream.Messages[stream.Messages.Count - 1];
+ return Task.FromResult(
+ new StreamMessage(
+ message.StreamId, message.MessageId,
+ message.StreamVersion, message.Position,
+ message.Created, message.Type,
+ message.JsonMetadata, message.JsonData));
+ }
+ }
+
protected override IAllStreamSubscription SubscribeToAllInternal(
long? fromPosition,
AllStreamMessageReceived streamMessageReceived,
diff --git a/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs b/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs
index 3d780fe02..7a03a55b8 100644
--- a/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs
+++ b/src/SqlStreamStore/Infrastructure/ReadonlyStreamStoreBase.cs
@@ -240,6 +240,22 @@ public Task ReadStreamHeadPosition(StreamId streamId, CancellationToken ca
return ReadStreamHeadPositionInternal(streamId, cancellationToken);
}
+ public Task ReadHeadMessage(CancellationToken cancellationToken = default)
+ {
+ GuardAgainstDisposed();
+
+ return ReadHeadMessageInternal(cancellationToken);
+ }
+
+ public Task ReadStreamHeadMessage(StreamId streamId, CancellationToken cancellationToken = default)
+ {
+ if (streamId == null) throw new ArgumentNullException(nameof(streamId));
+
+ GuardAgainstDisposed();
+
+ return ReadStreamHeadMessageInternal(streamId, cancellationToken);
+ }
+
public Task ReadStreamHeadVersion(StreamId streamId, CancellationToken cancellationToken = default)
{
if (streamId == null) throw new ArgumentNullException(nameof(streamId));
@@ -318,6 +334,8 @@ protected abstract Task ReadStreamBackwardsInternal(
protected abstract Task ReadHeadPositionInternal(CancellationToken cancellationToken);
protected abstract Task ReadStreamHeadPositionInternal(string streamId, CancellationToken cancellationToken);
protected abstract Task ReadStreamHeadVersionInternal(string streamId, CancellationToken cancellationToken);
+ protected abstract Task ReadHeadMessageInternal(CancellationToken cancellationToken);
+ protected abstract Task ReadStreamHeadMessageInternal(string streamId, CancellationToken cancellationToken);
protected abstract IStreamSubscription SubscribeToStreamInternal(
string streamId,
diff --git a/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs b/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs
index 5481de59e..c598bea34 100644
--- a/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs
+++ b/tests/SqlStreamStore.AcceptanceTests/AcceptanceTests.ReadHeadCheckpoint.cs
@@ -94,5 +94,65 @@ public async Task Given_filled_stream_when_get_head_version_then_returns_expecte
head.ShouldBe(2);
}
+
+ [Fact]
+ public async Task Given_empty_store_when_get_head_message_Then_should_be_minus_one()
+ {
+ var head = await Store.ReadHeadMessage();
+
+ head.ShouldBe(default);
+ }
+
+ [Fact]
+ public async Task Given_store_with_empty_stream_when_get_head_message_Then_should_be_minus_one()
+ {
+ await Store.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamMessages());
+
+ var head = await Store.ReadHeadMessage();
+
+ head.ShouldBe(default);
+ }
+
+ [Fact]
+ public async Task Given_store_with_messages_then_can_get_head_message()
+ {
+ var messages = CreateNewStreamMessages(1, 2, 3);
+ await Store.AppendToStream("stream-1", ExpectedVersion.NoStream, messages);
+
+ var head = await Store.ReadHeadMessage();
+
+ head.ShouldBe(messages[3]);
+ }
+
+ [Fact]
+ public async Task Given_no_stream_when_get_stream_head_message_then_returns_expected_result()
+ {
+ await Store.AppendToStream("other-stream", ExpectedVersion.NoStream, CreateNewStreamMessages(1));
+
+ var head = await Store.ReadStreamHeadMessage("this-stream");
+
+ head.ShouldBe(default);
+ }
+
+ [Fact]
+ public async Task Given_empty_stream_when_get_stream_head_message_then_returns_expected_result()
+ {
+ await Store.AppendToStream("this-stream", ExpectedVersion.NoStream, CreateNewStreamMessages());
+
+ var head = await Store.ReadStreamHeadMessage("this-stream");
+
+ head.ShouldBe(default);
+ }
+
+ [Fact]
+ public async Task Given_filled_stream_when_get_stream_head_message_then_returns_expected_result()
+ {
+ var messages = CreateNewStreamMessages(1, 2, 3);
+ await Store.AppendToStream("this-stream", ExpectedVersion.NoStream, messages);
+
+ var head = await Store.ReadStreamHeadMessage("this-stream");
+
+ head.ShouldBe(messages[3]);
+ }
}
}
\ No newline at end of file