diff --git a/default.ps1 b/default.ps1
index bb86234ee..41a3a788c 100644
--- a/default.ps1
+++ b/default.ps1
@@ -47,6 +47,7 @@ task RunTests -depends Compile {
Run-Tests "Cedar.EventStore.GetEventStore.Tests"
Run-Tests "Cedar.EventStore.MsSql2008.Tests"
Run-Tests "Cedar.EventStore.Sqlite.Tests"
+ Run-Tests "Cedar.EventStore.Postgres.Tests"
}
task ILMerge -depends Compile {
@@ -69,6 +70,12 @@ task ILMerge -depends Compile {
$inputDlls = "$dllDir\$mainDllName.dll"
@( "EnsureThat" ) |% { $inputDlls = "$inputDlls $dllDir\$_.dll" }
Invoke-Expression "$ilmergePath /targetplatform:v4 /internalize /allowDup /target:library /log /out:$mergedDir\$mainDllName.dll $inputDlls"
+
+ $mainDllName = "Cedar.EventStore.Postgres"
+ $dllDir = "$srcDir\$mainDllName\bin\Release"
+ $inputDlls = "$dllDir\$mainDllName.dll"
+ @( "EnsureThat", "Npgsql" ) |% { $inputDlls = "$inputDlls $dllDir\$_.dll" }
+ Invoke-Expression "$ilmergePath /targetplatform:v4 /internalize /allowDup /target:library /log /out:$mergedDir\$mainDllName.dll $inputDlls"
}
task CreateNuGetPackages -depends ILMerge {
diff --git a/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject b/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject
index 8782fa034..724068ebc 100644
Binary files a/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject and b/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject differ
diff --git a/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.csproj b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.csproj
new file mode 100644
index 000000000..9b0869488
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.csproj
@@ -0,0 +1,102 @@
+
+
+
+
+
+ Debug
+ AnyCPU
+ {453FB5DB-99DC-42D3-9DFE-F81EDF98F5E3}
+ Library
+ Properties
+ Cedar.EventStore.Postgres.Tests
+ Cedar.EventStore.Postgres.Tests
+ v4.5.1
+ 512
+
+
+
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+
+
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+
+
+
+ ..\packages\FluentAssertions.3.3.0\lib\net45\FluentAssertions.dll
+ True
+
+
+ ..\packages\FluentAssertions.3.3.0\lib\net45\FluentAssertions.Core.dll
+ True
+
+
+ ..\packages\Npgsql.3.1.0-unstable0000\lib\net45\Npgsql.dll
+ True
+
+
+
+
+
+
+
+
+
+ ..\packages\xunit.abstractions.2.0.0\lib\net35\xunit.abstractions.dll
+ True
+
+
+ ..\packages\xunit.extensibility.core.2.0.0\lib\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.dll
+ True
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {148C90E9-0EA1-482E-94A9-F178294EFAC2}
+ Cedar.EventStore.Postgres
+
+
+ {F3FB96CF-4A3D-448D-A25E-6BC66E370BF6}
+ Cedar.EventStore.Tests
+
+
+ {3553E8E7-2C2A-45D5-BCB1-9AC7E5A209B2}
+ Cedar.EventStore
+
+
+
+
+
+ This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.v2.ncrunchproject b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.v2.ncrunchproject
new file mode 100644
index 000000000..724068ebc
Binary files /dev/null and b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.v2.ncrunchproject differ
diff --git a/src/Cedar.EventStore.Postgres.Tests/ConcurrentLoadTests.cs b/src/Cedar.EventStore.Postgres.Tests/ConcurrentLoadTests.cs
new file mode 100644
index 000000000..709377860
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/ConcurrentLoadTests.cs
@@ -0,0 +1,38 @@
+namespace Cedar.EventStore.Postgres.Tests
+{
+ using Exceptions;
+ using System;
+ using System.Threading.Tasks;
+ using Xunit;
+ using Xunit.Abstractions;
+
+ public class ConcurrentLoadTests
+ {
+ public ConcurrentLoadTests(ITestOutputHelper testOutputHelper)
+ { }
+
+ [Fact]
+ public async Task conccurrent_appends_might_throw_WrongExpectedVersionException_and_thats_ok()
+ {
+ var eventStore = new PostgresEventStore(@"Server=127.0.0.1;Port=5432;Database=cedar_tests;User Id=postgres;Password=postgres;");
+
+ await eventStore.DropAll(ignoreErrors: true);
+ await eventStore.InitializeStore();
+
+ using (eventStore)
+ {
+ for (var i = 0; i < 3; i++)
+ {
+ Parallel.For(0, 4, async (iteration) =>
+ {
+ var streamId = string.Concat("stream-", iteration);
+ await eventStore
+ .AppendToStream(streamId, ExpectedVersion.Any, new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"metadata\""))
+ .MightThrow("Append failed due to WrongExpectedVersion. Stream: {0}, Expected version: -2".FormatWith(streamId));
+ });
+
+ }
+ }
+ }
+ }
+}
diff --git a/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreFixture.cs b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreFixture.cs
new file mode 100644
index 000000000..a7ec33fa1
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreFixture.cs
@@ -0,0 +1,28 @@
+namespace Cedar.EventStore.Postgres.Tests
+{
+ using System.Threading.Tasks;
+
+ public class PostgresEventStoreFixture : EventStoreAcceptanceTestFixture
+ {
+ private readonly string _schema;
+ public PostgresEventStoreFixture(string schema = "public")
+ {
+ _schema = schema;
+ }
+
+ public override Task GetEventStore()
+ {
+ return GetEventStore(_schema);
+ }
+
+ private async Task GetEventStore(string schema)
+ {
+ var eventStore = new PostgresEventStore(@"Server=127.0.0.1;Port=5432;Database=cedar_tests;User Id=postgres;Password=postgres;", schema);
+
+ await eventStore.DropAll(ignoreErrors: true);
+ await eventStore.InitializeStore();
+
+ return eventStore;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreTests.cs b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreTests.cs
new file mode 100644
index 000000000..8563f64a5
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreTests.cs
@@ -0,0 +1,18 @@
+namespace Cedar.EventStore.Postgres.Tests
+{
+ using System.Threading.Tasks;
+ using Xunit;
+ using Xunit.Abstractions;
+
+ public class PostgresEventStoreTests : EventStoreAcceptanceTests
+ {
+ public PostgresEventStoreTests(ITestOutputHelper testOutputHelper)
+ : base(testOutputHelper)
+ {}
+
+ protected override EventStoreAcceptanceTestFixture GetFixture()
+ {
+ return new PostgresEventStoreFixture();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres.Tests/Properties/AssemblyInfo.cs b/src/Cedar.EventStore.Postgres.Tests/Properties/AssemblyInfo.cs
new file mode 100644
index 000000000..996ba0a20
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Cedar.EventStore.Postgres.Tests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Cedar.EventStore.Postgres.Tests")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("dcf6bdff-8e51-46ae-9bcd-94a6ab78b26a")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/src/Cedar.EventStore.Postgres.Tests/SecondarySchemaTests.cs b/src/Cedar.EventStore.Postgres.Tests/SecondarySchemaTests.cs
new file mode 100644
index 000000000..ac6565e3d
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/SecondarySchemaTests.cs
@@ -0,0 +1,30 @@
+namespace Cedar.EventStore.Postgres.Tests
+{
+ using System.Threading.Tasks;
+ using Xunit;
+ using Xunit.Abstractions;
+
+ public class SecondarySchemaTests : EventStoreAcceptanceTests
+ {
+ public SecondarySchemaTests(ITestOutputHelper testOutputHelper)
+ : base(testOutputHelper)
+ {}
+
+ protected override EventStoreAcceptanceTestFixture GetFixture()
+ {
+ return new PostgresEventStoreFixture("secondary_schema");
+ }
+
+ [Fact]
+ public async Task can_store_events_in_different_schemas()
+ {
+ using (var defaultStore = await GetFixture().GetEventStore())
+ using (var secondaryStore = await new PostgresEventStoreFixture("saga_events").GetEventStore())
+ {
+ const string streamId = "stream-1";
+ await defaultStore.AppendToStream(streamId, ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3));
+ await secondaryStore.AppendToStream(streamId, ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres.Tests/TaskExtensions.cs b/src/Cedar.EventStore.Postgres.Tests/TaskExtensions.cs
new file mode 100644
index 000000000..8077379cf
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/TaskExtensions.cs
@@ -0,0 +1,26 @@
+namespace Cedar.EventStore.Postgres.Tests
+{
+ using FluentAssertions;
+ using System;
+ using System.Threading.Tasks;
+
+ internal static class TaskExtensions
+ {
+ internal static async Task MightThrow(this Task task, string message)
+ {
+ try
+ {
+ await task;
+ }
+ catch (Exception ex)
+ {
+ ex.Should().BeOfType();
+ ex.Message.Should().Be(message);
+
+ return;
+ }
+
+ //it didn't throw an exception, that's ok too
+ }
+ }
+}
diff --git a/src/Cedar.EventStore.Postgres.Tests/packages.config b/src/Cedar.EventStore.Postgres.Tests/packages.config
new file mode 100644
index 000000000..be379f255
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres.Tests/packages.config
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.csproj b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.csproj
new file mode 100644
index 000000000..f0077a787
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.csproj
@@ -0,0 +1,92 @@
+
+
+
+
+ Debug
+ AnyCPU
+ {148C90E9-0EA1-482E-94A9-F178294EFAC2}
+ Library
+ Properties
+ Cedar.EventStore.Postgres
+ Cedar.EventStore.Postgres
+ v4.5
+ 512
+
+
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+
+
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+ bin\Release\Cedar.EventStore.Postgres.XML
+
+
+
+ ..\packages\Ensure.That.2.0.0\lib\portable-net4+sl5+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\EnsureThat.dll\EnsureThat.dll
+ True
+
+
+ ..\packages\Npgsql.3.0.3\lib\net45\Npgsql.dll
+ True
+
+
+
+
+
+
+
+
+
+
+
+
+ Properties\SharedAssemblyInfo.cs
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {3553E8E7-2C2A-45D5-BCB1-9AC7E5A209B2}
+ Cedar.EventStore
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.nuspec b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.nuspec
new file mode 100644
index 000000000..0a4683089
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.nuspec
@@ -0,0 +1,29 @@
+
+
+
+ Cedar.EventStore.Postgres
+ 0.0.0.0
+ Damian Hickey
+ Damian Hickey
+ https://github.com/damianh/Cedar.EventStore
+ https://github.com/damianh/Cedar.EventStore/blob/master/LICENSE
+ false
+ Cedar - Event Store - PostgreSQL
+
+ PostgreSQL plugin for Cedar EventStore
+
+ cqrs event-sourcing domain-driven-design postgres postgresql pgsql
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.v2.ncrunchproject b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.v2.ncrunchproject
new file mode 100644
index 000000000..724068ebc
Binary files /dev/null and b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.v2.ncrunchproject differ
diff --git a/src/Cedar.EventStore.Postgres/CheckpointExtensions.cs b/src/Cedar.EventStore.Postgres/CheckpointExtensions.cs
new file mode 100644
index 000000000..bb21ab6b6
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/CheckpointExtensions.cs
@@ -0,0 +1,14 @@
+namespace Cedar.EventStore.Postgres
+{
+ internal static class CheckpointExtensions
+ {
+ internal static long GetOrdinal(this Checkpoint checkpoint)
+ {
+ if(ReferenceEquals(checkpoint, Checkpoint.Start))
+ {
+ return -1;
+ }
+ return ReferenceEquals(checkpoint, Checkpoint.End) ? long.MaxValue : long.Parse(checkpoint.Value);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/InterlockedBoolean.cs b/src/Cedar.EventStore.Postgres/InterlockedBoolean.cs
new file mode 100644
index 000000000..304063631
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/InterlockedBoolean.cs
@@ -0,0 +1,72 @@
+//
+// Copyright 2013 Hans Wolff
+//
+// Source: https://gist.github.com/hanswolff/7926751
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+
+namespace Cedar.EventStore.Postgres
+{
+ using System.Threading;
+
+ ///
+ /// Interlocked support for boolean values
+ ///
+ internal class InterlockedBoolean
+ {
+ private int _value;
+
+ ///
+ /// Current value
+ ///
+ public bool Value
+ {
+ get { return this._value == 1; }
+ }
+
+ ///
+ /// Initializes a new instance of
+ ///
+ /// initial value
+ public InterlockedBoolean(bool initialValue = false)
+ {
+ this._value = initialValue ? 1 : 0;
+ }
+
+ ///
+ /// Sets a new value
+ ///
+ /// new value
+ /// the original value before any operation was performed
+ public bool Set(bool newValue)
+ {
+ var oldValue = Interlocked.Exchange(ref this._value, newValue ? 1 : 0);
+ return oldValue == 1;
+ }
+
+ ///
+ /// Compares the current value and the comparand for equality and, if they are equal,
+ /// replaces the current value with the new value in an atomic/thread-safe operation.
+ ///
+ /// new value
+ /// value to compare the current value with
+ /// the original value before any operation was performed
+ public bool CompareExchange(bool newValue, bool comparand)
+ {
+ var oldValue = Interlocked.CompareExchange(ref this._value, newValue ? 1 : 0, comparand ? 1 : 0);
+ return oldValue == 1;
+ }
+ }
+}
diff --git a/src/Cedar.EventStore.Postgres/InterlockedBooleanExtensions.cs b/src/Cedar.EventStore.Postgres/InterlockedBooleanExtensions.cs
new file mode 100644
index 000000000..ae8be7978
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/InterlockedBooleanExtensions.cs
@@ -0,0 +1,10 @@
+namespace Cedar.EventStore.Postgres
+{
+ internal static class InterlockedBooleanExtensions
+ {
+ internal static bool EnsureCalledOnce(this InterlockedBoolean interlockedBoolean)
+ {
+ return interlockedBoolean.CompareExchange(true, false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/PostgresEventStore.cs b/src/Cedar.EventStore.Postgres/PostgresEventStore.cs
new file mode 100644
index 000000000..db0c01f8e
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/PostgresEventStore.cs
@@ -0,0 +1,557 @@
+namespace Cedar.EventStore.Postgres
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Configuration;
+ using System.Data;
+ using System.Linq;
+ using System.Security.Cryptography;
+ using System.Text;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ using Cedar.EventStore.Exceptions;
+ using Cedar.EventStore.Postgres.SqlScripts;
+
+ using EnsureThat;
+
+ using Npgsql;
+
+ using NpgsqlTypes;
+
+ public class PostgresEventStore : IEventStore
+ {
+ private readonly Func> _createAndOpenConnection;
+
+ private readonly InterlockedBoolean _isDisposed = new InterlockedBoolean();
+
+ private readonly Scripts _scripts;
+
+ public PostgresEventStore(string connectionStringOrConnectionStringName, string schema = "public")
+ {
+ if(connectionStringOrConnectionStringName.IndexOf(';') > -1)
+ {
+ var builder = new NpgsqlConnectionStringBuilder(connectionStringOrConnectionStringName);
+ _createAndOpenConnection = async () =>
+ {
+ var connection = new NpgsqlConnection(builder);
+ await connection.OpenAsync();
+ return connection;
+ };
+ }
+ else
+ {
+ _createAndOpenConnection = async () =>
+ {
+ var connection = new NpgsqlConnection(ConfigurationManager.ConnectionStrings[connectionStringOrConnectionStringName].ConnectionString);
+ await connection.OpenAsync();
+ return connection;
+ };
+ }
+
+ _scripts = new Scripts(schema);
+ }
+
+ public async Task AppendToStream(
+ string streamId,
+ int expectedVersion,
+ IEnumerable events,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Ensure.That(streamId, "streamId").IsNotNullOrWhiteSpace();
+ Ensure.That(expectedVersion, "expectedVersion").IsGte(-2);
+ Ensure.That(events, "events").IsNotNull();
+
+ var streamIdInfo = HashStreamId(streamId);
+
+ using(var connection = await _createAndOpenConnection())
+ using(var tx = connection.BeginTransaction(IsolationLevel.Serializable))
+ {
+ int streamIdInternal = -1;
+ int currentVersion = expectedVersion;
+ bool isDeleted = false;
+
+ if(expectedVersion == ExpectedVersion.NoStream)
+ {
+ try
+ {
+ using (
+ var command = new NpgsqlCommand(_scripts.Functions.CreateStream, connection, tx)
+ {
+ CommandType
+ =
+ CommandType
+ .StoredProcedure
+ })
+ {
+ command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId);
+ command.Parameters.AddWithValue(":stream_id_original", streamIdInfo.StreamIdOriginal);
+
+ streamIdInternal =
+ (int)await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext();
+ }
+ }
+ catch(NpgsqlException ex)
+ {
+ tx.Rollback();
+
+ if(ex.Code == "23505")
+ {
+ //not found
+ throw new WrongExpectedVersionException(
+ Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion), ex);
+ }
+
+ throw;
+ }
+ }
+ else
+ {
+ using (var command = new NpgsqlCommand(_scripts.Functions.GetStream, connection, tx) { CommandType = CommandType.StoredProcedure })
+ {
+ command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId);
+
+ using(var dr = await command.ExecuteReaderAsync().NotOnCapturedContext())
+ {
+ while (await dr.ReadAsync().NotOnCapturedContext())
+ {
+ streamIdInternal = dr.GetInt32(0);
+ isDeleted = dr.GetBoolean(1);
+
+ if (!isDeleted)
+ {
+ currentVersion = dr.IsDBNull(2) ? -1 : dr.GetInt32(2);
+ }
+ }
+ }
+ }
+
+ if(isDeleted)
+ {
+ throw new StreamDeletedException(Messages.EventStreamIsDeleted.FormatWith(streamId));
+ }
+
+ if(expectedVersion != ExpectedVersion.Any && currentVersion != expectedVersion)
+ {
+ throw new WrongExpectedVersionException(
+ Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion));
+ }
+
+ if(streamIdInternal == -1)
+ {
+ // create the stream as it doesn't exist
+
+ try
+ {
+ using (
+ var command = new NpgsqlCommand(_scripts.Functions.CreateStream, connection, tx)
+ {
+ CommandType
+ =
+ CommandType
+ .StoredProcedure
+ })
+ {
+ command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId);
+ command.Parameters.AddWithValue(":stream_id_original", streamIdInfo.StreamIdOriginal);
+
+ streamIdInternal =
+ (int)await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext();
+ }
+ }
+ catch (NpgsqlException ex)
+ {
+ if (ex.Code == "40001")
+ {
+ // could not serialize access due to read/write dependencies among transactions
+ throw new WrongExpectedVersionException(
+ Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion), ex);
+ }
+
+ //if error code is 40001 the transaction is already rolled back
+ tx.Rollback();
+
+ throw;
+ }
+ }
+ }
+
+ try
+ {
+ using (var writer = connection.BeginBinaryImport(_scripts.BulkCopyEvents) )
+ {
+ foreach (var @event in events)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ writer.Cancel();
+ tx.Rollback();
+ break;
+ }
+
+ currentVersion++;
+ writer.StartRow();
+ writer.Write(streamIdInternal, NpgsqlDbType.Integer);
+ writer.Write(currentVersion, NpgsqlDbType.Integer);
+ writer.Write(@event.EventId, NpgsqlDbType.Uuid);
+ writer.Write(SystemClock.GetUtcNow(), NpgsqlDbType.TimestampTZ);
+ writer.Write(@event.Type);
+ writer.Write(@event.JsonData, NpgsqlDbType.Json);
+ writer.Write(@event.JsonMetadata, NpgsqlDbType.Json);
+ }
+ }
+
+ tx.Commit();
+ }
+ catch (NpgsqlException ex)
+ {
+ if (ex.Code == "40001")
+ {
+ // could not serialize access due to read/write dependencies among transactions
+ throw new WrongExpectedVersionException(
+ Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion), ex);
+ }
+
+ //if error code is 40001 the transaction is already rolled back
+ tx.Rollback();
+
+ throw;
+ }
+ }
+ }
+
+ public Task DeleteStream(
+ string streamId,
+ int expectedVersion = ExpectedVersion.Any,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Ensure.That(streamId, "streamId").IsNotNullOrWhiteSpace();
+ Ensure.That(expectedVersion, "expectedVersion").IsGte(-2);
+
+ var streamIdInfo = HashStreamId(streamId);
+
+ return expectedVersion == ExpectedVersion.Any
+ ? this.DeleteStreamAnyVersion(streamIdInfo, cancellationToken)
+ : this.DeleteStreamExpectedVersion(streamIdInfo, expectedVersion, cancellationToken);
+ }
+
+ private async Task DeleteStreamAnyVersion(
+ StreamIdInfo streamIdInfo,
+ CancellationToken cancellationToken)
+ {
+ using (var connection = await _createAndOpenConnection())
+ using (var command = new NpgsqlCommand(_scripts.Functions.DeleteStreamAnyVersion, connection) { CommandType = CommandType.StoredProcedure })
+ {
+ command.Parameters.AddWithValue("stream_id", streamIdInfo.StreamId);
+ await command
+ .ExecuteNonQueryAsync(cancellationToken)
+ .NotOnCapturedContext();
+ }
+ }
+
+
+ private async Task DeleteStreamExpectedVersion(
+ StreamIdInfo streamIdInfo,
+ int expectedVersion,
+ CancellationToken cancellationToken)
+ {
+ using (var connection = await _createAndOpenConnection())
+ using (var command = new NpgsqlCommand(_scripts.Functions.DeleteStreamExpectedVersion, connection) { CommandType = CommandType.StoredProcedure })
+ {
+ command.Parameters.AddWithValue("stream_id", streamIdInfo.StreamId);
+ command.Parameters.AddWithValue("expected_version", expectedVersion);
+ try
+ {
+ await command
+ .ExecuteNonQueryAsync(cancellationToken)
+ .NotOnCapturedContext();
+ }
+ catch(NpgsqlException ex)
+ {
+ if(ex.MessageText == "WrongExpectedVersion")
+ {
+ throw new WrongExpectedVersionException(Messages.DeleteStreamFailedWrongExpectedVersion.FormatWith(streamIdInfo.StreamIdOriginal, expectedVersion));
+ }
+ throw;
+ }
+ }
+ }
+
+ public async Task ReadAll(
+ Checkpoint checkpoint,
+ int maxCount,
+ ReadDirection direction = ReadDirection.Forward,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Ensure.That(checkpoint, "checkpoint").IsNotNull();
+ Ensure.That(maxCount, "maxCount").IsGt(0);
+
+ if(this._isDisposed.Value)
+ {
+ throw new ObjectDisposedException("PostgresEventStore");
+ }
+
+ long ordinal = checkpoint.GetOrdinal();
+
+ var commandText = direction == ReadDirection.Forward ? _scripts.ReadAllForward : _scripts.ReadAllBackward;
+
+ using (var connection = await _createAndOpenConnection())
+ using (var command = new NpgsqlCommand(commandText, connection))// { CommandType = CommandType.StoredProcedure })
+ {
+ command.Parameters.AddWithValue(":ordinal", NpgsqlDbType.Bigint, ordinal);
+ command.Parameters.AddWithValue(":count", NpgsqlDbType.Integer, maxCount + 1); //Read extra row to see if at end or not
+
+ List streamEvents = new List();
+
+ long latestCheckpoint = ordinal;
+
+ using(var reader = await command.ExecuteReaderAsync(cancellationToken).NotOnCapturedContext())
+ {
+ if (!reader.HasRows)
+ {
+ return new AllEventsPage(checkpoint.Value,
+ checkpoint.Value,
+ true,
+ direction,
+ streamEvents.ToArray());
+ }
+
+
+ while (await reader.ReadAsync(cancellationToken).NotOnCapturedContext())
+ {
+ var streamId = reader.GetString(0);
+ var StreamVersion = reader.GetInt32(1);
+ ordinal = reader.GetInt64(2);
+ var eventId = reader.GetGuid(3);
+ var created = reader.GetDateTime(4);
+ var type = reader.GetString(5);
+ var jsonData = reader.GetString(6);
+ var jsonMetadata = reader.GetString(7);
+
+ latestCheckpoint = ordinal;
+
+ var streamEvent = new StreamEvent(streamId,
+ eventId,
+ StreamVersion,
+ ordinal.ToString(),
+ created,
+ type,
+ jsonData,
+ jsonMetadata);
+
+ streamEvents.Add(streamEvent);
+ }
+ }
+
+ bool isEnd = true;
+ string nextCheckpoint = latestCheckpoint.ToString();
+
+ if(streamEvents.Count == maxCount + 1)
+ {
+ isEnd = false;
+ streamEvents.RemoveAt(maxCount);
+ }
+ else
+ {
+ nextCheckpoint = (latestCheckpoint + 1).ToString();
+ }
+
+
+ return new AllEventsPage(checkpoint.Value,
+ nextCheckpoint,
+ isEnd,
+ direction,
+ streamEvents.ToArray());
+ }
+ }
+
+ public async Task ReadStream(
+ string streamId,
+ int start,
+ int count,
+ ReadDirection direction = ReadDirection.Forward,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Ensure.That(streamId, "streamId").IsNotNull();
+ Ensure.That(start, "start").IsGte(-1);
+ Ensure.That(count, "count").IsGte(0);
+
+ var streamIdInfo = HashStreamId(streamId);
+
+ var StreamVersion = start == StreamPosition.End ? int.MaxValue : start;
+ string commandText;
+ Func, int> getNextSequenceNumber;
+ if(direction == ReadDirection.Forward)
+ {
+ commandText = _scripts.Functions.ReadStreamForward;
+ getNextSequenceNumber = events => events.Count > 0 ? events.Last().StreamVersion + 1 : -1; //todo: review this
+ }
+ else
+ {
+ commandText = _scripts.Functions.ReadStreamBackward;
+ getNextSequenceNumber = events => events.Count > 0 ? events.Last().StreamVersion - 1 : -1; //todo: review this
+ }
+
+ using (var connection = await _createAndOpenConnection())
+ using (var tx = connection.BeginTransaction())
+ using (var command = new NpgsqlCommand(commandText, connection, tx) { CommandType = CommandType.StoredProcedure })
+ {
+ command.Parameters.AddWithValue(":stream_id", NpgsqlDbType.Text, streamIdInfo.StreamId);
+ command.Parameters.AddWithValue(":count", NpgsqlDbType.Integer, count + 1); //Read extra row to see if at end or not
+ command.Parameters.AddWithValue(":stream_version", NpgsqlDbType.Integer, StreamVersion);
+
+ List streamEvents = new List();
+
+ using(var reader = await command.ExecuteReaderAsync(cancellationToken).NotOnCapturedContext())
+ {
+ await reader.ReadAsync(cancellationToken).NotOnCapturedContext();
+ bool doesNotExist = reader.IsDBNull(0);
+ if(doesNotExist)
+ {
+ return new StreamEventsPage(
+ streamId, PageReadStatus.StreamNotFound, start, -1, -1, direction, isEndOfStream: true);
+ }
+
+
+ // Read IsDeleted result set
+ var isDeleted = reader.GetBoolean(1);
+ if(isDeleted)
+ {
+ return new StreamEventsPage(
+ streamId, PageReadStatus.StreamDeleted, 0, 0, 0, direction, isEndOfStream: true);
+ }
+
+
+ // Read Events result set
+ await reader.NextResultAsync(cancellationToken).NotOnCapturedContext();
+ while(await reader.ReadAsync(cancellationToken).NotOnCapturedContext())
+ {
+ var StreamVersion1 = reader.GetInt32(0);
+ var ordinal = reader.GetInt64(1);
+ var eventId = reader.GetGuid(2);
+ var created = reader.GetDateTime(3);
+ var type = reader.GetString(4);
+ var jsonData = reader.GetString(5);
+ var jsonMetadata = reader.GetString(6);
+
+ var streamEvent = new StreamEvent(
+ streamId, eventId, StreamVersion1, ordinal.ToString(), created, type, jsonData, jsonMetadata);
+
+ streamEvents.Add(streamEvent);
+ }
+
+ // Read last event revision result set
+ await reader.NextResultAsync(cancellationToken).NotOnCapturedContext();
+ await reader.ReadAsync(cancellationToken).NotOnCapturedContext();
+ var lastStreamVersion = reader.HasRows ? reader.GetInt32(0) : -1; //todo: figure out wtf going on here
+
+
+ bool isEnd = true;
+ if(streamEvents.Count == count + 1)
+ {
+ isEnd = false;
+ streamEvents.RemoveAt(count);
+ }
+
+ return new StreamEventsPage(
+ streamId,
+ PageReadStatus.Success,
+ start,
+ getNextSequenceNumber(streamEvents),
+ lastStreamVersion,
+ direction,
+ isEnd,
+ streamEvents.ToArray());
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ if(this._isDisposed.EnsureCalledOnce())
+ {
+ return;
+ }
+ //no clean up to do, lean on Npgsql connection pooling
+ }
+
+ public async Task InitializeStore(
+ bool ignoreErrors = false,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ using (var connection = await _createAndOpenConnection())
+ using(var cmd = new NpgsqlCommand(_scripts.InitializeStore, connection))
+ {
+ if (ignoreErrors)
+ {
+ await ExecuteAndIgnoreErrors(() => cmd.ExecuteNonQueryAsync(cancellationToken))
+ .NotOnCapturedContext();
+ }
+ else
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken)
+ .NotOnCapturedContext();
+ }
+ }
+ }
+
+ public async Task DropAll(
+ bool ignoreErrors = false,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ using (var connection = await _createAndOpenConnection())
+ using(var cmd = new NpgsqlCommand(_scripts.DropAll, connection))
+ {
+ if (ignoreErrors)
+ {
+ await ExecuteAndIgnoreErrors(() => cmd.ExecuteNonQueryAsync(cancellationToken))
+ .NotOnCapturedContext();
+ }
+ else
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken)
+ .NotOnCapturedContext();
+ }
+ }
+ }
+
+ private static async Task ExecuteAndIgnoreErrors(Func> operation)
+ {
+ try
+ {
+ return await operation().NotOnCapturedContext();
+ }
+ catch
+ {
+ return default(T);
+ }
+ }
+
+ private static StreamIdInfo HashStreamId(string streamId)
+ {
+ Ensure.That(streamId, "streamId").IsNotNullOrWhiteSpace();
+
+ Guid _;
+ if(Guid.TryParse(streamId, out _))
+ {
+ return new StreamIdInfo(streamId, streamId);
+ }
+
+ byte[] hashBytes = SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(streamId));
+ var hashedStreamId = BitConverter.ToString(hashBytes).Replace("-", "");
+ return new StreamIdInfo(hashedStreamId, streamId);
+ }
+
+ private class StreamIdInfo
+ {
+ public readonly string StreamId;
+ public readonly string StreamIdOriginal;
+
+ public StreamIdInfo(string streamId, string streamIdOriginal)
+ {
+ this.StreamId = streamId;
+ this.StreamIdOriginal = streamIdOriginal;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/Properties/AssemblyInfo.cs b/src/Cedar.EventStore.Postgres/Properties/AssemblyInfo.cs
new file mode 100644
index 000000000..240c04cf7
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/Properties/AssemblyInfo.cs
@@ -0,0 +1,4 @@
+using System.Reflection;
+
+[assembly: AssemblyTitle("Cedar.EventStore.Postgres")]
+[assembly: AssemblyDescription("")]
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/BulkCopyEvents.sql b/src/Cedar.EventStore.Postgres/SqlScripts/BulkCopyEvents.sql
new file mode 100644
index 000000000..f422b0592
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/BulkCopyEvents.sql
@@ -0,0 +1,2 @@
+COPY $schema$.events (stream_id_internal, stream_version, id, created, type, json_data, json_metadata)
+FROM STDIN BINARY
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/CreateStream.sql b/src/Cedar.EventStore.Postgres/SqlScripts/CreateStream.sql
new file mode 100644
index 000000000..1eacf7cde
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/CreateStream.sql
@@ -0,0 +1,3 @@
+INSERT INTO $schema$.streams(id, id_original)
+VALUES (:stream_id, :stream_id_original)
+RETURNING id_internal;
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/Dev.sql b/src/Cedar.EventStore.Postgres/SqlScripts/Dev.sql
new file mode 100644
index 000000000..3160e0320
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/Dev.sql
@@ -0,0 +1,306 @@
+DROP TABLE IF EXISTS events;
+DROP TABLE IF EXISTS streams;
+DROP TYPE IF EXISTS new_stream_events;
+CREATE TABLE streams(
+ id text NOT NULL,
+ id_original text NOT NULL,
+ id_internal SERIAL PRIMARY KEY NOT NULL,
+ is_deleted boolean DEFAULT (false) NOT NULL
+);
+CREATE UNIQUE INDEX ix_streams_id ON streams USING btree(id);
+
+CREATE TABLE events(
+ stream_id_internal integer NOT NULL,
+ stream_version integer NOT NULL,
+ ordinal SERIAL PRIMARY KEY NOT NULL ,
+ id uuid NOT NULL,
+ created timestamp NOT NULL,
+ type text NOT NULL,
+ json_data json NOT NULL,
+ json_metadata json ,
+ CONSTRAINT fk_events_streams FOREIGN KEY (stream_id_internal) REFERENCES streams(id_internal)
+);
+
+CREATE UNIQUE INDEX ix_events_stream_id_internal_revision ON events USING btree(stream_id_internal, stream_version);
+
+CREATE TYPE new_stream_events AS (
+ stream_version integer,
+ id uuid,
+ created timestamp,
+ type text,
+ json_data json,
+ json_metadata json
+);
+
+-- ExpectedVersion.NoStream
+
+new_events new_stream_events;
+stream_id text;
+
+stream_id = 'stream-1';
+
+
+INSERT INTO @newEvents
+ (
+ Id ,
+ [Type] ,
+ JsonData ,
+ JsonMetadata
+ ) VALUES
+ ('00000000-0000-0000-0000-000000000001', 'type1', '\"data1\"', '\"meta1\"'),
+ ('00000000-0000-0000-0000-000000000002', 'type2', '\"data2\"', '\"meta2\"'),
+ ('00000000-0000-0000-0000-000000000003', 'type3', '\"data3\"', '\"meta3\"'),
+ ('00000000-0000-0000-0000-000000000004', 'type4', '\"data4\"', '\"meta4\"');
+
+-- Actual SQL statement of interest
+SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
+BEGIN TRANSACTION CreateStream;
+ DECLARE @count AS INT;
+ DECLARE @streamIdInternal AS INT;
+ BEGIN
+ INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamId);
+ SELECT @streamIdInternal = SCOPE_IDENTITY();
+
+ INSERT INTO dbo.Events (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
+ SELECT @streamIdInternal,
+ StreamVersion,
+ Id,
+ Created,
+ [Type],
+ JsonData,
+ JsonMetadata
+ FROM @newEvents;
+
+ END;
+ SELECT @streamIdInternal;
+COMMIT TRANSACTION CreateStream;
+
+
+SET @streamId = 'stream-2';
+SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
+BEGIN TRANSACTION CreateStream;
+ BEGIN
+ INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamId);
+ SELECT @streamIdInternal = SCOPE_IDENTITY();
+
+ INSERT INTO dbo.Events (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
+ SELECT @streamIdInternal,
+ StreamVersion,
+ Id,
+ Created,
+ [Type],
+ JsonData,
+ JsonMetadata
+ FROM @newEvents
+
+ END;
+ SELECT @streamIdInternal;
+COMMIT TRANSACTION CreateStream;
+
+SELECT * FROM dbo.Streams;
+SELECT * FROM dbo.Events;
+
+DECLARE @pageNumber AS INT, @rowspPage AS INT;
+SET @pageNumber = 2;
+SET @rowspPage = 5;
+
+/* SQL Server 2012+ */
+ SELECT Streams.IdOriginal As StreamId,
+ Events.StreamVersion,
+ Events.Ordinal,
+ Events.Id AS EventId,
+ Events.Created,
+ Events.Type,
+ Events.JsonData,
+ Events.JsonMetadata
+ FROM Events
+ INNER JOIN Streams
+ ON Events.StreamIdInternal=Streams.IdInternal
+ ORDER BY Events.Ordinal
+ OFFSET ((@pageNumber - 1) * @rowspPage) ROWS
+ FETCH NEXT @rowspPage ROWS ONLY;
+
+ /* SQL Server 2000+ */
+ SELECT Id As StreamId,
+ StreamVersion,
+ Ordinal,
+ EventId,
+ Created,
+ [Type],
+ JsonData,
+ JsonMetadata
+ FROM (
+ SELECT ROW_NUMBER() OVER(ORDER BY Events.Ordinal) AS NUMBER,
+ Events.StreamIdInternal,
+ Events.StreamVersion,
+ Events.Ordinal,
+ Events.Id AS EventId,
+ Events.Created,
+ Events.Type,
+ Events.JsonData,
+ Events.JsonMetadata
+ FROM Events
+ ) AS PageTable
+ INNER JOIN Streams
+ ON StreamIdInternal=Streams.IdInternal
+ WHERE NUMBER BETWEEN ((@pageNumber - 1) * @RowspPage + 1) AND (@pageNumber * @rowspPage)
+ ORDER BY NUMBER;
+
+DECLARE @ordinal AS INT, @count1 AS INT;
+SET @ordinal = 2;
+SET @count1 = 5
+
+/* SQL Server 2008+ */
+ SELECT TOP(@count1)
+ Streams.IdOriginal As StreamId,
+ Events.StreamVersion,
+ Events.Ordinal,
+ Events.Id AS EventId,
+ Events.Created,
+ Events.Type,
+ Events.JsonData,
+ Events.JsonMetadata
+ FROM Events
+ INNER JOIN Streams
+ ON Events.StreamIdInternal=Streams.IdInternal
+ WHERE Events.Ordinal >= @ordinal
+ ORDER BY Events.Ordinal;
+
+ /* SQL Server 2008+ */
+ SELECT TOP(@count1)
+ Streams.IdOriginal As StreamId,
+ Events.StreamVersion,
+ Events.Ordinal,
+ Events.Id AS EventId,
+ Events.Created,
+ Events.Type,
+ Events.JsonData,
+ Events.JsonMetadata
+ FROM Events
+ INNER JOIN Streams
+ ON Events.StreamIdInternal=Streams.IdInternal
+ WHERE Events.Ordinal <= @ordinal
+ ORDER BY Events.Ordinal DESC;
+
+/* Delete Streeam*/
+BEGIN TRANSACTION DeleteStream
+ SELECT @streamIdInternal = Streams.IdInternal
+ FROM Streams
+ WHERE Streams.Id = @streamId;
+
+ DELETE FROM Events
+ WHERE Events.StreamIdInternal = @streamIdInternal;
+
+ UPDATE Streams
+ SET IsDeleted = '1'
+ WHERE Streams.Id = @streamId;
+COMMIT TRANSACTION DeleteStream
+
+SELECT * FROM dbo.Streams;
+SELECT * FROM dbo.Events;
+
+/* ReadStreamForward */
+
+SET @count = 5;
+SET @streamId = 'stream-1';
+DECLARE @StreamVersion AS INT = 0
+DECLARE @isDeleted AS BIT;
+
+ SELECT @streamIdInternal = Streams.IdInternal,
+ @isDeleted = Streams.IsDeleted
+ FROM Streams
+ WHERE Streams.Id = @streamId
+
+ SELECT @isDeleted AS IsDeleted
+
+ SELECT TOP(@count)
+ Events.StreamVersion,
+ Events.Ordinal,
+ Events.Id AS EventId,
+ Events.Created,
+ Events.Type,
+ Events.JsonData,
+ Events.JsonMetadata
+ FROM Events
+ INNER JOIN Streams
+ ON Events.StreamIdInternal = Streams.IdInternal
+ WHERE Events.StreamIDInternal = @streamIDInternal AND Events.StreamVersion >= @StreamVersion
+ ORDER BY Events.Ordinal;
+
+ SELECT TOP(1)
+ Events.StreamVersion
+ FROM Events
+ WHERE Events.StreamIDInternal = @streamIDInternal
+ ORDER BY Events.Ordinal DESC;
+
+/* ReadStreamBackward */
+
+SET @StreamVersion = 5;
+
+ SELECT @streamIdInternal = Streams.IdInternal,
+ @isDeleted = Streams.IsDeleted
+ FROM Streams
+ WHERE Streams.Id = @streamId
+
+ SELECT @isDeleted;
+
+ SELECT TOP(@count)
+ Streams.IdOriginal As StreamId,
+ Streams.IsDeleted as IsDeleted,
+ Events.StreamVersion,
+ Events.Ordinal,
+ Events.Id AS EventId,
+ Events.Created,
+ Events.Type,
+ Events.JsonData,
+ Events.JsonMetadata
+ FROM Events
+ INNER JOIN Streams
+ ON Events.StreamIdInternal = Streams.IdInternal
+ WHERE Events.StreamIDInternal = @streamIDInternal AND Events.StreamVersion <= @StreamVersion
+ ORDER BY Events.Ordinal DESC
+
+ SELECT TOP(1)
+ Events.StreamVersion
+ FROM Events
+ WHERE Events.StreamIDInternal = @streamIDInternal
+ ORDER BY Events.Ordinal DESC;
+
+/* Delete Stream with expected version */
+SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
+BEGIN TRANSACTION DeleteStream
+ DECLARE @streamIdInternal2 AS INT;
+ DECLARE @expectedStreamVersion AS INT = 3;
+ DECLARE @latestStreamVersion AS INT;
+ SET @streamId = 'stream-1';
+
+ SELECT @streamIdInternal2 = Streams.IdInternal
+ FROM Streams
+ WHERE Streams.Id = @streamId;
+
+ IF @streamIdInternal2 IS NULL
+ BEGIN
+ ROLLBACK TRANSACTION DeleteStream;
+ RAISERROR('WrongExpectedVersion', 12,1);
+ END
+
+ SELECT TOP(1)
+ @latestStreamVersion = Events.StreamVersion
+ FROM Events
+ WHERE Events.StreamIDInternal = @streamIdInternal2
+ ORDER BY Events.Ordinal DESC;
+
+ IF @latestStreamVersion != @expectedStreamVersion
+ BEGIN
+ ROLLBACK TRANSACTION DeleteStream;
+ RAISERROR('WrongExpectedVersion', 12,2);
+ END
+
+ UPDATE Streams
+ SET IsDeleted = '1'
+ WHERE Streams.Id = @streamId ;
+
+ DELETE FROM Events
+ WHERE Events.StreamIdInternal = @streamIdInternal2;
+
+COMMIT TRANSACTION DeleteStream
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/DropAll.sql b/src/Cedar.EventStore.Postgres/SqlScripts/DropAll.sql
new file mode 100644
index 000000000..dd097b90a
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/DropAll.sql
@@ -0,0 +1,3 @@
+DROP TABLE IF EXISTS $schema$.events;
+DROP TABLE IF EXISTS $schema$.streams;
+DROP TYPE IF EXISTS $schema$.new_stream_events;
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/GetStream.sql b/src/Cedar.EventStore.Postgres/SqlScripts/GetStream.sql
new file mode 100644
index 000000000..0cd6f7ffc
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/GetStream.sql
@@ -0,0 +1,9 @@
+SELECT streams.id_internal,
+ streams.is_deleted,
+ events.stream_version
+FROM $schema$.streams
+LEFT JOIN $schema$.events
+ ON events.stream_id_internal = streams.id_internal
+WHERE streams.id = :stream_id
+ORDER BY events.ordinal DESC
+LIMIT 1;
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/InitializeStore.sql b/src/Cedar.EventStore.Postgres/SqlScripts/InitializeStore.sql
new file mode 100644
index 000000000..266d9e321
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/InitializeStore.sql
@@ -0,0 +1,276 @@
+
+
+CREATE SCHEMA IF NOT EXISTS $schema$;
+
+CREATE TABLE $schema$.streams(
+ id_internal SERIAL PRIMARY KEY,
+ id text NOT NULL,
+ id_original text NOT NULL,
+ is_deleted boolean DEFAULT (false) NOT NULL
+);
+
+CREATE UNIQUE INDEX ix_streams_id
+ON $schema$.streams
+USING btree(id);
+
+CREATE TABLE $schema$.events(
+ stream_id_internal integer NOT NULL,
+ stream_version integer NOT NULL,
+ ordinal SERIAL PRIMARY KEY NOT NULL ,
+ id uuid NOT NULL,
+ created timestamp NOT NULL,
+ type text NOT NULL,
+ json_data json NOT NULL,
+ json_metadata json ,
+ CONSTRAINT fk_events_streams FOREIGN KEY (stream_id_internal) REFERENCES $schema$.streams(id_internal)
+);
+
+CREATE UNIQUE INDEX ix_events_stream_id_internal_revision
+ON $schema$.events
+USING btree(stream_id_internal, stream_version DESC, ordinal DESC);
+
+CREATE OR REPLACE FUNCTION $schema$.create_stream(_stream_id text, _stream_id_original text)
+RETURNS integer AS
+$BODY$
+DECLARE
+ _result integer;
+BEGIN
+ INSERT INTO $schema$.streams(id, id_original)
+ VALUES (_stream_id, _stream_id_original)
+ RETURNING id_internal
+ INTO _result;
+
+ RETURN _result;
+END;
+$BODY$
+LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION $schema$.get_stream(_stream_id text)
+RETURNS TABLE(id_internal integer, is_deleted boolean, stream_version integer) AS
+$BODY$
+BEGIN
+ RETURN QUERY
+ SELECT $schema$.streams.id_internal,
+ $schema$.streams.is_deleted,
+ (SELECT max($schema$.events.stream_version) from $schema$.events where $schema$.events.stream_id_internal = $schema$.streams.id_internal)
+ FROM $schema$.streams
+ WHERE $schema$.streams.id = _stream_id;
+END;
+$BODY$
+LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION $schema$.read_all_forward(_ordinal bigint, _count integer)
+RETURNS TABLE(stream_id integer, stream_version integer, ordinal bigint, event_id uuid, created timestamp, type text, json_data json, json_metadata json) AS
+$BODY$
+BEGIN
+ RETURN QUERY
+ SELECT
+ $schema$.streams.id_original As stream_id,
+ $schema$.events.stream_version,
+ $schema$.events.ordinal,
+ $schema$.events.id AS event_id,
+ $schema$.events.created,
+ $schema$.events.type,
+ $schema$.events.json_data,
+ $schema$.events.json_metadata
+ FROM $schema$.events
+ INNER JOIN $schema$.streams
+ ON $schema$.events.stream_id_internal = $schema$.streams.id_internal
+ WHERE $schema$.events.ordinal >= _ordinal
+ ORDER BY $schema$.events.ordinal
+ LIMIT _count;
+END;
+$BODY$
+LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION $schema$.read_all_backward(_ordinal bigint, _count integer)
+RETURNS TABLE(stream_id integer, stream_version integer, ordinal bigint, event_id uuid, created timestamp, type text, json_data json, json_metadata json) AS
+$BODY$
+BEGIN
+ RETURN QUERY
+ SELECT
+ $schema$.streams.id_original As stream_id,
+ $schema$.events.stream_version,
+ $schema$.events.ordinal,
+ $schema$.events.id AS event_id,
+ $schema$.events.created,
+ $schema$.events.type,
+ $schema$.events.json_data,
+ $schema$.events.json_metadata
+ FROM $schema$.events
+ INNER JOIN $schema$.streams
+ ON $schema$.events.stream_id_internal = $schema$.streams.id_internal
+ WHERE $schema$.events.ordinal <= _ordinal
+ ORDER BY $schema$.events.ordinal DESC
+ LIMIT _count;
+END;
+$BODY$
+LANGUAGE plpgsql;
+
+
+CREATE OR REPLACE FUNCTION $schema$.read_stream_forward(_stream_id text, _count integer, _stream_version integer) RETURNS SETOF refcursor AS
+$BODY$
+DECLARE
+ ref1 refcursor;
+ ref2 refcursor;
+ ref3 refcursor;
+ _stream_id_internal integer;
+ _is_deleted boolean;
+BEGIN
+SELECT $schema$.streams.id_internal, $schema$.streams.is_deleted
+ INTO _stream_id_internal, _is_deleted
+ FROM $schema$.streams
+ WHERE $schema$.streams.id = _stream_id;
+
+OPEN ref1 FOR
+ SELECT _stream_id_internal, _is_deleted;
+RETURN NEXT ref1;
+
+
+OPEN ref2 FOR
+ SELECT
+ $schema$.events.stream_version,
+ $schema$.events.ordinal,
+ $schema$.events.id AS event_id,
+ $schema$.events.created,
+ $schema$.events.type,
+ $schema$.events.json_data,
+ $schema$.events.json_metadata
+ FROM $schema$.events
+ INNER JOIN $schema$.streams
+ ON $schema$.events.stream_id_internal = $schema$.streams.id_internal
+ WHERE $schema$.events.stream_id_internal = _stream_id_internal
+ AND $schema$.events.stream_version >= _stream_version
+ ORDER BY $schema$.events.ordinal
+ LIMIT _count;
+RETURN next ref2;
+
+OPEN ref3 FOR
+ SELECT $schema$.events.stream_version
+ FROM $schema$.events
+ WHERE $schema$.events.stream_id_internal = _stream_id_internal
+ ORDER BY $schema$.events.ordinal DESC
+ LIMIT 1;
+RETURN next ref3;
+
+RETURN;
+END;
+$BODY$
+LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION $schema$.read_stream_backward(_stream_id text, _count integer, _stream_version integer) RETURNS SETOF refcursor AS
+$BODY$
+DECLARE
+ ref1 refcursor;
+ ref2 refcursor;
+ ref3 refcursor;
+ _stream_id_internal integer;
+ _is_deleted boolean;
+BEGIN
+
+SELECT $schema$.streams.id_internal, $schema$.streams.is_deleted
+ INTO _stream_id_internal, _is_deleted
+ FROM $schema$.streams
+ WHERE $schema$.streams.id = _stream_id;
+
+OPEN ref1 FOR
+ SELECT _stream_id_internal, _is_deleted;
+RETURN NEXT ref1;
+
+
+OPEN ref2 FOR
+ SELECT
+ $schema$.events.stream_version,
+ $schema$.events.ordinal,
+ $schema$.events.id AS event_id,
+ $schema$.events.created,
+ $schema$.events.type,
+ $schema$.events.json_data,
+ $schema$.events.json_metadata
+ FROM $schema$.events
+ INNER JOIN $schema$.streams
+ ON $schema$.events.stream_id_internal = $schema$.streams.id_internal
+ WHERE $schema$.events.stream_id_internal = _stream_id_internal
+ AND $schema$.events.stream_version <= _stream_version
+ ORDER BY $schema$.events.ordinal DESC
+ LIMIT _count;
+RETURN next ref2;
+
+OPEN ref3 FOR
+ SELECT $schema$.events.stream_version
+ FROM $schema$.events
+ WHERE $schema$.events.stream_id_internal = _stream_id_internal
+ ORDER BY $schema$.events.ordinal DESC
+ LIMIT 1;
+RETURN next ref3;
+
+RETURN;
+END;
+$BODY$
+LANGUAGE plpgsql;
+
+
+CREATE OR REPLACE FUNCTION $schema$.delete_stream_any_version(stream_id text) RETURNS VOID AS
+$BODY$
+DECLARE
+ _stream_id_internal integer;
+BEGIN
+
+ SELECT $schema$.streams.id_internal
+ INTO _stream_id_internal
+ FROM $schema$.streams
+ WHERE $schema$.streams.id = stream_id;
+
+ DELETE FROM $schema$.events
+ WHERE $schema$.events.stream_id_internal = _stream_id_internal;
+
+ UPDATE $schema$.streams
+ SET is_deleted = true
+ WHERE $schema$.streams.id_internal = _stream_id_internal;
+
+RETURN;
+END;
+$BODY$
+LANGUAGE plpgsql;
+
+
+CREATE OR REPLACE FUNCTION $schema$.delete_stream_expected_version(stream_id text, expected_version integer) RETURNS VOID AS
+$BODY$
+DECLARE
+ _stream_id_internal integer;
+ _lastest_stream_version integer;
+BEGIN
+
+ SELECT $schema$.streams.id_internal
+ INTO _stream_id_internal
+ FROM $schema$.streams
+ WHERE $schema$.streams.id = stream_id;
+
+ IF _stream_id_internal IS NULL THEN
+ RAISE EXCEPTION 'WrongExpectedVersion'
+ USING HINT = 'The Stream ' || stream_id || ' does not exist.';
+ END IF;
+
+ SELECT stream_version
+ INTO _lastest_stream_version
+ FROM $schema$.events
+ WHERE $schema$.events.stream_id_internal = _stream_id_internal
+ ORDER BY $schema$.events.ordinal DESC
+ LIMIT 1;
+
+ IF (_lastest_stream_version <> expected_version) THEN
+ RAISE EXCEPTION 'WrongExpectedVersion'
+ USING HINT = 'The Stream ' || stream_id || 'version was expected to be' || expected_version::text || ' but was version ' || _lastest_stream_version::text || '.' ;
+ END IF;
+
+ DELETE FROM $schema$.events
+ WHERE $schema$.events.stream_id_internal = _stream_id_internal;
+
+ UPDATE $schema$.streams
+ SET is_deleted = true
+ WHERE $schema$.streams.id_internal = _stream_id_internal;
+
+RETURN;
+END;
+$BODY$
+LANGUAGE plpgsql;
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllBackward.sql b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllBackward.sql
new file mode 100644
index 000000000..1fc9c7507
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllBackward.sql
@@ -0,0 +1,15 @@
+SELECT
+ streams.id_original As stream_id,
+ events.stream_version,
+ events.ordinal,
+ events.id AS event_id,
+ events.created,
+ events.type,
+ events.json_data,
+ events.json_metadata
+ FROM $schema$.events
+ INNER JOIN $schema$.streams
+ ON events.stream_id_internal = streams.id_internal
+ WHERE events.ordinal <= :ordinal
+ ORDER BY events.ordinal DESC
+LIMIT :count
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllForward.sql b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllForward.sql
new file mode 100644
index 000000000..1a03964a6
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllForward.sql
@@ -0,0 +1,15 @@
+SELECT
+ streams.id_original As stream_id,
+ events.stream_version,
+ events.ordinal,
+ events.id AS event_id,
+ events.created,
+ events.type,
+ events.json_data,
+ events.json_metadata
+ FROM $schema$.events
+ INNER JOIN $schema$.streams
+ ON events.stream_id_internal = streams.id_internal
+ WHERE events.ordinal >= :ordinal
+ ORDER BY events.ordinal
+LIMIT :count
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/Scripts.cs b/src/Cedar.EventStore.Postgres/SqlScripts/Scripts.cs
new file mode 100644
index 000000000..8a77f6734
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/SqlScripts/Scripts.cs
@@ -0,0 +1,118 @@
+namespace Cedar.EventStore.Postgres.SqlScripts
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.IO;
+
+ public class Scripts
+ {
+ private static readonly ConcurrentDictionary s_scripts
+ = new ConcurrentDictionary();
+
+ private readonly string _schema;
+
+ public Scripts(string schema = "public")
+ {
+ _schema = schema;
+ Functions = new GetFunctions(_schema);
+ }
+
+ public string InitializeStore
+ {
+ get { return GetScript("InitializeStore").Replace("$schema$", _schema); }
+ }
+
+ public string DropAll
+ {
+ get { return GetScript("DropAll").Replace("$schema$", _schema); }
+ }
+
+ public string BulkCopyEvents
+ {
+ get { return GetScript("BulkCopyEvents").Replace("$schema$", _schema); }
+ }
+
+ public string ReadAllForward
+ {
+ get { return GetScript("ReadAllForward").Replace("$schema$", _schema); }
+ }
+
+ public string ReadAllBackward
+ {
+ get { return GetScript("ReadAllBackward").Replace("$schema$", _schema); }
+ }
+
+ public GetFunctions Functions { get; private set; }
+
+ private string GetScript(string name)
+ {
+ return s_scripts.GetOrAdd(name,
+ key =>
+ {
+ using(Stream stream = typeof(Scripts)
+ .Assembly
+ .GetManifestResourceStream("Cedar.EventStore.Postgres.SqlScripts." + key + ".sql"))
+ {
+ if(stream == null)
+ {
+ throw new Exception("Embedded resource not found. BUG!");
+ }
+ using(StreamReader reader = new StreamReader(stream))
+ {
+ return reader.ReadToEnd();
+ }
+ }
+ });
+ }
+
+ public class GetFunctions
+ {
+ private readonly string _schema;
+
+ public GetFunctions(string schema)
+ {
+ _schema = schema;
+ }
+
+ public string CreateStream
+ {
+ get { return string.Concat(_schema, ".", "create_stream"); }
+ }
+
+ public string GetStream
+ {
+ get { return string.Concat(_schema, ".", "get_stream"); }
+ }
+
+ public string ReadAllForward
+ {
+ get { return string.Concat(_schema, ".", "read_all_forward"); }
+ }
+
+ public string ReadAllBackward
+ {
+ get { return string.Concat(_schema, ".", "read_all_backward"); }
+ }
+
+ public string ReadStreamForward
+ {
+ get { return string.Concat(_schema, ".", "read_stream_forward"); }
+ }
+
+ public string ReadStreamBackward
+ {
+ get { return string.Concat(_schema, ".", "read_stream_backward"); }
+ }
+
+ public string DeleteStreamAnyVersion
+ {
+ get { return string.Concat(_schema, ".", "delete_stream_any_version"); }
+ }
+
+ public string DeleteStreamExpectedVersion
+ {
+ get { return string.Concat(_schema, ".", "delete_stream_expected_version"); }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Postgres/packages.config b/src/Cedar.EventStore.Postgres/packages.config
new file mode 100644
index 000000000..9d1a043b2
--- /dev/null
+++ b/src/Cedar.EventStore.Postgres/packages.config
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs
index 0f5368a7c..4a3d73350 100644
--- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs
+++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs
@@ -97,5 +97,21 @@ await eventStore
}
}
}
+
+ [Fact]
+ public async Task When_append_stream_with_expected_version_any_and_no_stream_exists_should_not_throw()
+ {
+ // Idempotency
+ using (var fixture = GetFixture())
+ {
+ using (var eventStore = await fixture.GetEventStore())
+ {
+ const string streamId = "stream-1";
+ await eventStore
+ .AppendToStream(streamId, ExpectedVersion.Any, CreateNewStreamEvents(1, 2, 3))
+ .ShouldNotThrow();
+ }
+ }
+ }
}
}
diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs
index 18f926494..f61fb83f0 100644
--- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs
+++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs
@@ -11,15 +11,15 @@ public abstract partial class EventStoreAcceptanceTests
[Fact]
public async Task When_delete_existing_stream_with_no_expected_version_then_should_be_deleted()
{
- using(var fixture = GetFixture())
+ using (var fixture = GetFixture())
{
- using(var eventStore = await fixture.GetEventStore())
+ using (var eventStore = await fixture.GetEventStore())
{
const string streamId = "stream";
var events = new[]
{
- new NewStreamEvent(Guid.NewGuid(), "data", "meta"),
- new NewStreamEvent(Guid.NewGuid(), "data", "meta")
+ new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\""),
+ new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\"")
};
await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, events);
@@ -40,7 +40,7 @@ public async Task When_delete_stream_that_does_not_exist()
{
using (var eventStore = await fixture.GetEventStore())
{
- const string streamId = "stream";
+ const string streamId = "notexist";
Func act = () => eventStore.DeleteStream(streamId);
act.ShouldNotThrow();
@@ -58,8 +58,8 @@ public async Task When_delete_stream_with_a_matching_expected_version_then_shoul
const string streamId = "stream";
var events = new[]
{
- new NewStreamEvent(Guid.NewGuid(), "data", "meta"),
- new NewStreamEvent(Guid.NewGuid(), "data", "meta")
+ new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\""),
+ new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\"")
};
await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, events);
@@ -81,6 +81,7 @@ public async Task When_delete_a_stream_and_append_then_should_throw()
using (var eventStore = await fixture.GetEventStore())
{
const string streamId = "stream";
+
await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, CreateNewStreamEvents(1));
await eventStore.DeleteStream(streamId);
@@ -118,8 +119,8 @@ public async Task When_delete_stream_with_a_non_matching_expected_version_then_s
const string streamId = "stream";
var events = new[]
{
- new NewStreamEvent(Guid.NewGuid(), "data", "meta"),
- new NewStreamEvent(Guid.NewGuid(), "data", "meta")
+ new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\""),
+ new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\"")
};
await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, events);
diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs
index c112fb865..bfd5a2dfb 100644
--- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs
+++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs
@@ -1,5 +1,6 @@
namespace Cedar.EventStore
{
+ using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
@@ -52,6 +53,100 @@ public async Task Can_read_all_forwards()
}
}
+ [Fact]
+ public async Task Read_forwards_to_the_end_should_return_a_valid_Checkpoint()
+ {
+ using (var fixture = GetFixture())
+ {
+ using (var eventStore = await fixture.GetEventStore())
+ {
+ await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3, 4, 5, 6));
+
+ // read to the end of the stream
+ var allEventsPage = await eventStore.ReadAll(Checkpoint.Start, 4);
+
+ int count = 0; //counter is used to short circuit bad implementations that never return IsEnd = true
+
+ while (!allEventsPage.IsEnd && count < 20)
+ {
+ allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
+ count++;
+ }
+
+ allEventsPage.IsEnd.Should().BeTrue();
+
+ Checkpoint currentCheckpoint = allEventsPage.NextCheckpoint;
+ currentCheckpoint.Should().NotBeNull();
+
+ // read end of stream again, should be empty, should return same checkpoint
+ allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10);
+ count = 0;
+ while (!allEventsPage.IsEnd && count < 20)
+ {
+ allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
+ count++;
+ }
+
+ allEventsPage.StreamEvents.Should().BeEmpty();
+ allEventsPage.IsEnd.Should().BeTrue();
+ allEventsPage.NextCheckpoint.Should().NotBeNull();
+
+ currentCheckpoint = allEventsPage.NextCheckpoint;
+
+ // append some events then read again from the saved checkpoint, the next checkpoint should have moved
+ await eventStore.AppendToStream("stream-1", ExpectedVersion.Any, CreateNewStreamEvents(7, 8, 9));
+
+ allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10);
+ count = 0;
+ while (!allEventsPage.IsEnd && count < 20)
+ {
+ allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10);
+ count++;
+ }
+
+ allEventsPage.IsEnd.Should().BeTrue();
+ allEventsPage.NextCheckpoint.Should().NotBeNull();
+ allEventsPage.NextCheckpoint.Should().NotBe(currentCheckpoint.Value);
+ }
+ }
+ }
+
+ [Fact]
+ public async Task When_read_past_end_of_all()
+ {
+ using(var fixture = GetFixture())
+ {
+ using(var eventStore = await fixture.GetEventStore())
+ {
+ await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3));
+
+ bool isEnd = false;
+ int count = 0;
+ Checkpoint checkpoint = Checkpoint.Start;
+ while (!isEnd)
+ {
+ _testOutputHelper.WriteLine($"Loop {count}");
+
+ var streamEventsPage = await eventStore.ReadAll(checkpoint, 10);
+ _testOutputHelper.WriteLine($"FromCheckpoint = {streamEventsPage.FromCheckpoint}");
+ _testOutputHelper.WriteLine($"NextCheckpoint = {streamEventsPage.NextCheckpoint}");
+ _testOutputHelper.WriteLine($"IsEnd = {streamEventsPage.IsEnd}");
+ _testOutputHelper.WriteLine($"StreamEvents.Count = {streamEventsPage.StreamEvents.Count}");
+ _testOutputHelper.WriteLine("");
+
+ checkpoint = streamEventsPage.NextCheckpoint;
+ isEnd = streamEventsPage.IsEnd;
+ count++;
+
+ if(count > 100)
+ {
+ throw new Exception("omg wtf");
+ }
+ }
+ }
+ }
+ }
+
[Fact]
public async Task Can_read_all_backward()
{
diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs
index 8cdde47aa..c68bc733c 100644
--- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs
+++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs
@@ -46,6 +46,42 @@ public async Task Can_read_streams_forwards_and_backwards(ReadStreamTheory theor
}
}
+ [Theory]
+ [InlineData(ReadDirection.Forward, 0, 10)]
+ [InlineData(ReadDirection.Backward, StreamPosition.End, 10)]
+ public async Task Empty_Streams_return_StreamNotFound(ReadDirection direction, int start, int pageSize)
+ {
+ using(var fixture = GetFixture())
+ {
+ using(var eventStore = await fixture.GetEventStore())
+ {
+ var streamEventsPage =
+ await eventStore.ReadStream("stream-does-not-exist", start, pageSize, direction);
+
+ streamEventsPage.Status.Should().Be(PageReadStatus.StreamNotFound);
+ }
+ }
+ }
+
+ [Theory]
+ [InlineData(ReadDirection.Forward, 0, 10)]
+ [InlineData(ReadDirection.Backward, StreamPosition.End, 10)]
+ public async Task Deleted_Streams_return_StreamDeleted(ReadDirection direction, int start, int pageSize)
+ {
+ using (var fixture = GetFixture())
+ {
+ using (var eventStore = await fixture.GetEventStore())
+ {
+ await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3));
+ await eventStore.DeleteStream("stream-1", ExpectedVersion.Any);
+
+ var streamEventsPage =
+ await eventStore.ReadStream("stream-1", start, pageSize, direction);
+
+ streamEventsPage.Status.Should().Be(PageReadStatus.StreamDeleted);
+ }
+ }
+ }
public static IEnumerable