diff --git a/default.ps1 b/default.ps1 index bb86234ee..41a3a788c 100644 --- a/default.ps1 +++ b/default.ps1 @@ -47,6 +47,7 @@ task RunTests -depends Compile { Run-Tests "Cedar.EventStore.GetEventStore.Tests" Run-Tests "Cedar.EventStore.MsSql2008.Tests" Run-Tests "Cedar.EventStore.Sqlite.Tests" + Run-Tests "Cedar.EventStore.Postgres.Tests" } task ILMerge -depends Compile { @@ -69,6 +70,12 @@ task ILMerge -depends Compile { $inputDlls = "$dllDir\$mainDllName.dll" @( "EnsureThat" ) |% { $inputDlls = "$inputDlls $dllDir\$_.dll" } Invoke-Expression "$ilmergePath /targetplatform:v4 /internalize /allowDup /target:library /log /out:$mergedDir\$mainDllName.dll $inputDlls" + + $mainDllName = "Cedar.EventStore.Postgres" + $dllDir = "$srcDir\$mainDllName\bin\Release" + $inputDlls = "$dllDir\$mainDllName.dll" + @( "EnsureThat", "Npgsql" ) |% { $inputDlls = "$inputDlls $dllDir\$_.dll" } + Invoke-Expression "$ilmergePath /targetplatform:v4 /internalize /allowDup /target:library /log /out:$mergedDir\$mainDllName.dll $inputDlls" } task CreateNuGetPackages -depends ILMerge { diff --git a/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject b/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject index 8782fa034..724068ebc 100644 Binary files a/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject and b/src/Cedar.EventStore.GetEventStore.Tests/Cedar.EventStore.GetEventStore.Tests.v2.ncrunchproject differ diff --git a/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.csproj b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.csproj new file mode 100644 index 000000000..9b0869488 --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.csproj @@ -0,0 +1,102 @@ + + + + + + Debug + AnyCPU + {453FB5DB-99DC-42D3-9DFE-F81EDF98F5E3} + Library + Properties + Cedar.EventStore.Postgres.Tests + Cedar.EventStore.Postgres.Tests + v4.5.1 + 512 + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\packages\FluentAssertions.3.3.0\lib\net45\FluentAssertions.dll + True + + + ..\packages\FluentAssertions.3.3.0\lib\net45\FluentAssertions.Core.dll + True + + + ..\packages\Npgsql.3.1.0-unstable0000\lib\net45\Npgsql.dll + True + + + + + + + + + + ..\packages\xunit.abstractions.2.0.0\lib\net35\xunit.abstractions.dll + True + + + ..\packages\xunit.extensibility.core.2.0.0\lib\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.dll + True + + + + + + + + + + + + + + + + {148C90E9-0EA1-482E-94A9-F178294EFAC2} + Cedar.EventStore.Postgres + + + {F3FB96CF-4A3D-448D-A25E-6BC66E370BF6} + Cedar.EventStore.Tests + + + {3553E8E7-2C2A-45D5-BCB1-9AC7E5A209B2} + Cedar.EventStore + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.v2.ncrunchproject b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.v2.ncrunchproject new file mode 100644 index 000000000..724068ebc Binary files /dev/null and b/src/Cedar.EventStore.Postgres.Tests/Cedar.EventStore.Postgres.Tests.v2.ncrunchproject differ diff --git a/src/Cedar.EventStore.Postgres.Tests/ConcurrentLoadTests.cs b/src/Cedar.EventStore.Postgres.Tests/ConcurrentLoadTests.cs new file mode 100644 index 000000000..709377860 --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/ConcurrentLoadTests.cs @@ -0,0 +1,38 @@ +namespace Cedar.EventStore.Postgres.Tests +{ + using Exceptions; + using System; + using System.Threading.Tasks; + using Xunit; + using Xunit.Abstractions; + + public class ConcurrentLoadTests + { + public ConcurrentLoadTests(ITestOutputHelper testOutputHelper) + { } + + [Fact] + public async Task conccurrent_appends_might_throw_WrongExpectedVersionException_and_thats_ok() + { + var eventStore = new PostgresEventStore(@"Server=127.0.0.1;Port=5432;Database=cedar_tests;User Id=postgres;Password=postgres;"); + + await eventStore.DropAll(ignoreErrors: true); + await eventStore.InitializeStore(); + + using (eventStore) + { + for (var i = 0; i < 3; i++) + { + Parallel.For(0, 4, async (iteration) => + { + var streamId = string.Concat("stream-", iteration); + await eventStore + .AppendToStream(streamId, ExpectedVersion.Any, new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"metadata\"")) + .MightThrow("Append failed due to WrongExpectedVersion. Stream: {0}, Expected version: -2".FormatWith(streamId)); + }); + + } + } + } + } +} diff --git a/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreFixture.cs b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreFixture.cs new file mode 100644 index 000000000..a7ec33fa1 --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreFixture.cs @@ -0,0 +1,28 @@ +namespace Cedar.EventStore.Postgres.Tests +{ + using System.Threading.Tasks; + + public class PostgresEventStoreFixture : EventStoreAcceptanceTestFixture + { + private readonly string _schema; + public PostgresEventStoreFixture(string schema = "public") + { + _schema = schema; + } + + public override Task GetEventStore() + { + return GetEventStore(_schema); + } + + private async Task GetEventStore(string schema) + { + var eventStore = new PostgresEventStore(@"Server=127.0.0.1;Port=5432;Database=cedar_tests;User Id=postgres;Password=postgres;", schema); + + await eventStore.DropAll(ignoreErrors: true); + await eventStore.InitializeStore(); + + return eventStore; + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreTests.cs b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreTests.cs new file mode 100644 index 000000000..8563f64a5 --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/PostgresEventStoreTests.cs @@ -0,0 +1,18 @@ +namespace Cedar.EventStore.Postgres.Tests +{ + using System.Threading.Tasks; + using Xunit; + using Xunit.Abstractions; + + public class PostgresEventStoreTests : EventStoreAcceptanceTests + { + public PostgresEventStoreTests(ITestOutputHelper testOutputHelper) + : base(testOutputHelper) + {} + + protected override EventStoreAcceptanceTestFixture GetFixture() + { + return new PostgresEventStoreFixture(); + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres.Tests/Properties/AssemblyInfo.cs b/src/Cedar.EventStore.Postgres.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..996ba0a20 --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Cedar.EventStore.Postgres.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Cedar.EventStore.Postgres.Tests")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("dcf6bdff-8e51-46ae-9bcd-94a6ab78b26a")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/Cedar.EventStore.Postgres.Tests/SecondarySchemaTests.cs b/src/Cedar.EventStore.Postgres.Tests/SecondarySchemaTests.cs new file mode 100644 index 000000000..ac6565e3d --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/SecondarySchemaTests.cs @@ -0,0 +1,30 @@ +namespace Cedar.EventStore.Postgres.Tests +{ + using System.Threading.Tasks; + using Xunit; + using Xunit.Abstractions; + + public class SecondarySchemaTests : EventStoreAcceptanceTests + { + public SecondarySchemaTests(ITestOutputHelper testOutputHelper) + : base(testOutputHelper) + {} + + protected override EventStoreAcceptanceTestFixture GetFixture() + { + return new PostgresEventStoreFixture("secondary_schema"); + } + + [Fact] + public async Task can_store_events_in_different_schemas() + { + using (var defaultStore = await GetFixture().GetEventStore()) + using (var secondaryStore = await new PostgresEventStoreFixture("saga_events").GetEventStore()) + { + const string streamId = "stream-1"; + await defaultStore.AppendToStream(streamId, ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3)); + await secondaryStore.AppendToStream(streamId, ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3)); + } + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres.Tests/TaskExtensions.cs b/src/Cedar.EventStore.Postgres.Tests/TaskExtensions.cs new file mode 100644 index 000000000..8077379cf --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/TaskExtensions.cs @@ -0,0 +1,26 @@ +namespace Cedar.EventStore.Postgres.Tests +{ + using FluentAssertions; + using System; + using System.Threading.Tasks; + + internal static class TaskExtensions + { + internal static async Task MightThrow(this Task task, string message) + { + try + { + await task; + } + catch (Exception ex) + { + ex.Should().BeOfType(); + ex.Message.Should().Be(message); + + return; + } + + //it didn't throw an exception, that's ok too + } + } +} diff --git a/src/Cedar.EventStore.Postgres.Tests/packages.config b/src/Cedar.EventStore.Postgres.Tests/packages.config new file mode 100644 index 000000000..be379f255 --- /dev/null +++ b/src/Cedar.EventStore.Postgres.Tests/packages.config @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.csproj b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.csproj new file mode 100644 index 000000000..f0077a787 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.csproj @@ -0,0 +1,92 @@ + + + + + Debug + AnyCPU + {148C90E9-0EA1-482E-94A9-F178294EFAC2} + Library + Properties + Cedar.EventStore.Postgres + Cedar.EventStore.Postgres + v4.5 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + bin\Release\Cedar.EventStore.Postgres.XML + + + + ..\packages\Ensure.That.2.0.0\lib\portable-net4+sl5+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\EnsureThat.dll\EnsureThat.dll + True + + + ..\packages\Npgsql.3.0.3\lib\net45\Npgsql.dll + True + + + + + + + + + + + + + Properties\SharedAssemblyInfo.cs + + + + + + + + + + + + + + + + + + + + + + + + + + + {3553E8E7-2C2A-45D5-BCB1-9AC7E5A209B2} + Cedar.EventStore + + + + + \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.nuspec b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.nuspec new file mode 100644 index 000000000..0a4683089 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.nuspec @@ -0,0 +1,29 @@ + + + + Cedar.EventStore.Postgres + 0.0.0.0 + Damian Hickey + Damian Hickey + https://github.com/damianh/Cedar.EventStore + https://github.com/damianh/Cedar.EventStore/blob/master/LICENSE + false + Cedar - Event Store - PostgreSQL + + PostgreSQL plugin for Cedar EventStore + + cqrs event-sourcing domain-driven-design postgres postgresql pgsql + + + + + + + + + + + + + + diff --git a/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.v2.ncrunchproject b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.v2.ncrunchproject new file mode 100644 index 000000000..724068ebc Binary files /dev/null and b/src/Cedar.EventStore.Postgres/Cedar.EventStore.Postgres.v2.ncrunchproject differ diff --git a/src/Cedar.EventStore.Postgres/CheckpointExtensions.cs b/src/Cedar.EventStore.Postgres/CheckpointExtensions.cs new file mode 100644 index 000000000..bb21ab6b6 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/CheckpointExtensions.cs @@ -0,0 +1,14 @@ +namespace Cedar.EventStore.Postgres +{ + internal static class CheckpointExtensions + { + internal static long GetOrdinal(this Checkpoint checkpoint) + { + if(ReferenceEquals(checkpoint, Checkpoint.Start)) + { + return -1; + } + return ReferenceEquals(checkpoint, Checkpoint.End) ? long.MaxValue : long.Parse(checkpoint.Value); + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/InterlockedBoolean.cs b/src/Cedar.EventStore.Postgres/InterlockedBoolean.cs new file mode 100644 index 000000000..304063631 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/InterlockedBoolean.cs @@ -0,0 +1,72 @@ +// +// Copyright 2013 Hans Wolff +// +// Source: https://gist.github.com/hanswolff/7926751 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + + +namespace Cedar.EventStore.Postgres +{ + using System.Threading; + + /// + /// Interlocked support for boolean values + /// + internal class InterlockedBoolean + { + private int _value; + + /// + /// Current value + /// + public bool Value + { + get { return this._value == 1; } + } + + /// + /// Initializes a new instance of + /// + /// initial value + public InterlockedBoolean(bool initialValue = false) + { + this._value = initialValue ? 1 : 0; + } + + /// + /// Sets a new value + /// + /// new value + /// the original value before any operation was performed + public bool Set(bool newValue) + { + var oldValue = Interlocked.Exchange(ref this._value, newValue ? 1 : 0); + return oldValue == 1; + } + + /// + /// Compares the current value and the comparand for equality and, if they are equal, + /// replaces the current value with the new value in an atomic/thread-safe operation. + /// + /// new value + /// value to compare the current value with + /// the original value before any operation was performed + public bool CompareExchange(bool newValue, bool comparand) + { + var oldValue = Interlocked.CompareExchange(ref this._value, newValue ? 1 : 0, comparand ? 1 : 0); + return oldValue == 1; + } + } +} diff --git a/src/Cedar.EventStore.Postgres/InterlockedBooleanExtensions.cs b/src/Cedar.EventStore.Postgres/InterlockedBooleanExtensions.cs new file mode 100644 index 000000000..ae8be7978 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/InterlockedBooleanExtensions.cs @@ -0,0 +1,10 @@ +namespace Cedar.EventStore.Postgres +{ + internal static class InterlockedBooleanExtensions + { + internal static bool EnsureCalledOnce(this InterlockedBoolean interlockedBoolean) + { + return interlockedBoolean.CompareExchange(true, false); + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/PostgresEventStore.cs b/src/Cedar.EventStore.Postgres/PostgresEventStore.cs new file mode 100644 index 000000000..db0c01f8e --- /dev/null +++ b/src/Cedar.EventStore.Postgres/PostgresEventStore.cs @@ -0,0 +1,557 @@ +namespace Cedar.EventStore.Postgres +{ + using System; + using System.Collections.Generic; + using System.Configuration; + using System.Data; + using System.Linq; + using System.Security.Cryptography; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + + using Cedar.EventStore.Exceptions; + using Cedar.EventStore.Postgres.SqlScripts; + + using EnsureThat; + + using Npgsql; + + using NpgsqlTypes; + + public class PostgresEventStore : IEventStore + { + private readonly Func> _createAndOpenConnection; + + private readonly InterlockedBoolean _isDisposed = new InterlockedBoolean(); + + private readonly Scripts _scripts; + + public PostgresEventStore(string connectionStringOrConnectionStringName, string schema = "public") + { + if(connectionStringOrConnectionStringName.IndexOf(';') > -1) + { + var builder = new NpgsqlConnectionStringBuilder(connectionStringOrConnectionStringName); + _createAndOpenConnection = async () => + { + var connection = new NpgsqlConnection(builder); + await connection.OpenAsync(); + return connection; + }; + } + else + { + _createAndOpenConnection = async () => + { + var connection = new NpgsqlConnection(ConfigurationManager.ConnectionStrings[connectionStringOrConnectionStringName].ConnectionString); + await connection.OpenAsync(); + return connection; + }; + } + + _scripts = new Scripts(schema); + } + + public async Task AppendToStream( + string streamId, + int expectedVersion, + IEnumerable events, + CancellationToken cancellationToken = default(CancellationToken)) + { + Ensure.That(streamId, "streamId").IsNotNullOrWhiteSpace(); + Ensure.That(expectedVersion, "expectedVersion").IsGte(-2); + Ensure.That(events, "events").IsNotNull(); + + var streamIdInfo = HashStreamId(streamId); + + using(var connection = await _createAndOpenConnection()) + using(var tx = connection.BeginTransaction(IsolationLevel.Serializable)) + { + int streamIdInternal = -1; + int currentVersion = expectedVersion; + bool isDeleted = false; + + if(expectedVersion == ExpectedVersion.NoStream) + { + try + { + using ( + var command = new NpgsqlCommand(_scripts.Functions.CreateStream, connection, tx) + { + CommandType + = + CommandType + .StoredProcedure + }) + { + command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId); + command.Parameters.AddWithValue(":stream_id_original", streamIdInfo.StreamIdOriginal); + + streamIdInternal = + (int)await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext(); + } + } + catch(NpgsqlException ex) + { + tx.Rollback(); + + if(ex.Code == "23505") + { + //not found + throw new WrongExpectedVersionException( + Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion), ex); + } + + throw; + } + } + else + { + using (var command = new NpgsqlCommand(_scripts.Functions.GetStream, connection, tx) { CommandType = CommandType.StoredProcedure }) + { + command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId); + + using(var dr = await command.ExecuteReaderAsync().NotOnCapturedContext()) + { + while (await dr.ReadAsync().NotOnCapturedContext()) + { + streamIdInternal = dr.GetInt32(0); + isDeleted = dr.GetBoolean(1); + + if (!isDeleted) + { + currentVersion = dr.IsDBNull(2) ? -1 : dr.GetInt32(2); + } + } + } + } + + if(isDeleted) + { + throw new StreamDeletedException(Messages.EventStreamIsDeleted.FormatWith(streamId)); + } + + if(expectedVersion != ExpectedVersion.Any && currentVersion != expectedVersion) + { + throw new WrongExpectedVersionException( + Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion)); + } + + if(streamIdInternal == -1) + { + // create the stream as it doesn't exist + + try + { + using ( + var command = new NpgsqlCommand(_scripts.Functions.CreateStream, connection, tx) + { + CommandType + = + CommandType + .StoredProcedure + }) + { + command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId); + command.Parameters.AddWithValue(":stream_id_original", streamIdInfo.StreamIdOriginal); + + streamIdInternal = + (int)await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext(); + } + } + catch (NpgsqlException ex) + { + if (ex.Code == "40001") + { + // could not serialize access due to read/write dependencies among transactions + throw new WrongExpectedVersionException( + Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion), ex); + } + + //if error code is 40001 the transaction is already rolled back + tx.Rollback(); + + throw; + } + } + } + + try + { + using (var writer = connection.BeginBinaryImport(_scripts.BulkCopyEvents) ) + { + foreach (var @event in events) + { + if (cancellationToken.IsCancellationRequested) + { + writer.Cancel(); + tx.Rollback(); + break; + } + + currentVersion++; + writer.StartRow(); + writer.Write(streamIdInternal, NpgsqlDbType.Integer); + writer.Write(currentVersion, NpgsqlDbType.Integer); + writer.Write(@event.EventId, NpgsqlDbType.Uuid); + writer.Write(SystemClock.GetUtcNow(), NpgsqlDbType.TimestampTZ); + writer.Write(@event.Type); + writer.Write(@event.JsonData, NpgsqlDbType.Json); + writer.Write(@event.JsonMetadata, NpgsqlDbType.Json); + } + } + + tx.Commit(); + } + catch (NpgsqlException ex) + { + if (ex.Code == "40001") + { + // could not serialize access due to read/write dependencies among transactions + throw new WrongExpectedVersionException( + Messages.AppendFailedWrongExpectedVersion.FormatWith(streamId, expectedVersion), ex); + } + + //if error code is 40001 the transaction is already rolled back + tx.Rollback(); + + throw; + } + } + } + + public Task DeleteStream( + string streamId, + int expectedVersion = ExpectedVersion.Any, + CancellationToken cancellationToken = default(CancellationToken)) + { + Ensure.That(streamId, "streamId").IsNotNullOrWhiteSpace(); + Ensure.That(expectedVersion, "expectedVersion").IsGte(-2); + + var streamIdInfo = HashStreamId(streamId); + + return expectedVersion == ExpectedVersion.Any + ? this.DeleteStreamAnyVersion(streamIdInfo, cancellationToken) + : this.DeleteStreamExpectedVersion(streamIdInfo, expectedVersion, cancellationToken); + } + + private async Task DeleteStreamAnyVersion( + StreamIdInfo streamIdInfo, + CancellationToken cancellationToken) + { + using (var connection = await _createAndOpenConnection()) + using (var command = new NpgsqlCommand(_scripts.Functions.DeleteStreamAnyVersion, connection) { CommandType = CommandType.StoredProcedure }) + { + command.Parameters.AddWithValue("stream_id", streamIdInfo.StreamId); + await command + .ExecuteNonQueryAsync(cancellationToken) + .NotOnCapturedContext(); + } + } + + + private async Task DeleteStreamExpectedVersion( + StreamIdInfo streamIdInfo, + int expectedVersion, + CancellationToken cancellationToken) + { + using (var connection = await _createAndOpenConnection()) + using (var command = new NpgsqlCommand(_scripts.Functions.DeleteStreamExpectedVersion, connection) { CommandType = CommandType.StoredProcedure }) + { + command.Parameters.AddWithValue("stream_id", streamIdInfo.StreamId); + command.Parameters.AddWithValue("expected_version", expectedVersion); + try + { + await command + .ExecuteNonQueryAsync(cancellationToken) + .NotOnCapturedContext(); + } + catch(NpgsqlException ex) + { + if(ex.MessageText == "WrongExpectedVersion") + { + throw new WrongExpectedVersionException(Messages.DeleteStreamFailedWrongExpectedVersion.FormatWith(streamIdInfo.StreamIdOriginal, expectedVersion)); + } + throw; + } + } + } + + public async Task ReadAll( + Checkpoint checkpoint, + int maxCount, + ReadDirection direction = ReadDirection.Forward, + CancellationToken cancellationToken = default(CancellationToken)) + { + Ensure.That(checkpoint, "checkpoint").IsNotNull(); + Ensure.That(maxCount, "maxCount").IsGt(0); + + if(this._isDisposed.Value) + { + throw new ObjectDisposedException("PostgresEventStore"); + } + + long ordinal = checkpoint.GetOrdinal(); + + var commandText = direction == ReadDirection.Forward ? _scripts.ReadAllForward : _scripts.ReadAllBackward; + + using (var connection = await _createAndOpenConnection()) + using (var command = new NpgsqlCommand(commandText, connection))// { CommandType = CommandType.StoredProcedure }) + { + command.Parameters.AddWithValue(":ordinal", NpgsqlDbType.Bigint, ordinal); + command.Parameters.AddWithValue(":count", NpgsqlDbType.Integer, maxCount + 1); //Read extra row to see if at end or not + + List streamEvents = new List(); + + long latestCheckpoint = ordinal; + + using(var reader = await command.ExecuteReaderAsync(cancellationToken).NotOnCapturedContext()) + { + if (!reader.HasRows) + { + return new AllEventsPage(checkpoint.Value, + checkpoint.Value, + true, + direction, + streamEvents.ToArray()); + } + + + while (await reader.ReadAsync(cancellationToken).NotOnCapturedContext()) + { + var streamId = reader.GetString(0); + var StreamVersion = reader.GetInt32(1); + ordinal = reader.GetInt64(2); + var eventId = reader.GetGuid(3); + var created = reader.GetDateTime(4); + var type = reader.GetString(5); + var jsonData = reader.GetString(6); + var jsonMetadata = reader.GetString(7); + + latestCheckpoint = ordinal; + + var streamEvent = new StreamEvent(streamId, + eventId, + StreamVersion, + ordinal.ToString(), + created, + type, + jsonData, + jsonMetadata); + + streamEvents.Add(streamEvent); + } + } + + bool isEnd = true; + string nextCheckpoint = latestCheckpoint.ToString(); + + if(streamEvents.Count == maxCount + 1) + { + isEnd = false; + streamEvents.RemoveAt(maxCount); + } + else + { + nextCheckpoint = (latestCheckpoint + 1).ToString(); + } + + + return new AllEventsPage(checkpoint.Value, + nextCheckpoint, + isEnd, + direction, + streamEvents.ToArray()); + } + } + + public async Task ReadStream( + string streamId, + int start, + int count, + ReadDirection direction = ReadDirection.Forward, + CancellationToken cancellationToken = default(CancellationToken)) + { + Ensure.That(streamId, "streamId").IsNotNull(); + Ensure.That(start, "start").IsGte(-1); + Ensure.That(count, "count").IsGte(0); + + var streamIdInfo = HashStreamId(streamId); + + var StreamVersion = start == StreamPosition.End ? int.MaxValue : start; + string commandText; + Func, int> getNextSequenceNumber; + if(direction == ReadDirection.Forward) + { + commandText = _scripts.Functions.ReadStreamForward; + getNextSequenceNumber = events => events.Count > 0 ? events.Last().StreamVersion + 1 : -1; //todo: review this + } + else + { + commandText = _scripts.Functions.ReadStreamBackward; + getNextSequenceNumber = events => events.Count > 0 ? events.Last().StreamVersion - 1 : -1; //todo: review this + } + + using (var connection = await _createAndOpenConnection()) + using (var tx = connection.BeginTransaction()) + using (var command = new NpgsqlCommand(commandText, connection, tx) { CommandType = CommandType.StoredProcedure }) + { + command.Parameters.AddWithValue(":stream_id", NpgsqlDbType.Text, streamIdInfo.StreamId); + command.Parameters.AddWithValue(":count", NpgsqlDbType.Integer, count + 1); //Read extra row to see if at end or not + command.Parameters.AddWithValue(":stream_version", NpgsqlDbType.Integer, StreamVersion); + + List streamEvents = new List(); + + using(var reader = await command.ExecuteReaderAsync(cancellationToken).NotOnCapturedContext()) + { + await reader.ReadAsync(cancellationToken).NotOnCapturedContext(); + bool doesNotExist = reader.IsDBNull(0); + if(doesNotExist) + { + return new StreamEventsPage( + streamId, PageReadStatus.StreamNotFound, start, -1, -1, direction, isEndOfStream: true); + } + + + // Read IsDeleted result set + var isDeleted = reader.GetBoolean(1); + if(isDeleted) + { + return new StreamEventsPage( + streamId, PageReadStatus.StreamDeleted, 0, 0, 0, direction, isEndOfStream: true); + } + + + // Read Events result set + await reader.NextResultAsync(cancellationToken).NotOnCapturedContext(); + while(await reader.ReadAsync(cancellationToken).NotOnCapturedContext()) + { + var StreamVersion1 = reader.GetInt32(0); + var ordinal = reader.GetInt64(1); + var eventId = reader.GetGuid(2); + var created = reader.GetDateTime(3); + var type = reader.GetString(4); + var jsonData = reader.GetString(5); + var jsonMetadata = reader.GetString(6); + + var streamEvent = new StreamEvent( + streamId, eventId, StreamVersion1, ordinal.ToString(), created, type, jsonData, jsonMetadata); + + streamEvents.Add(streamEvent); + } + + // Read last event revision result set + await reader.NextResultAsync(cancellationToken).NotOnCapturedContext(); + await reader.ReadAsync(cancellationToken).NotOnCapturedContext(); + var lastStreamVersion = reader.HasRows ? reader.GetInt32(0) : -1; //todo: figure out wtf going on here + + + bool isEnd = true; + if(streamEvents.Count == count + 1) + { + isEnd = false; + streamEvents.RemoveAt(count); + } + + return new StreamEventsPage( + streamId, + PageReadStatus.Success, + start, + getNextSequenceNumber(streamEvents), + lastStreamVersion, + direction, + isEnd, + streamEvents.ToArray()); + } + } + } + + public void Dispose() + { + if(this._isDisposed.EnsureCalledOnce()) + { + return; + } + //no clean up to do, lean on Npgsql connection pooling + } + + public async Task InitializeStore( + bool ignoreErrors = false, + CancellationToken cancellationToken = default(CancellationToken)) + { + using (var connection = await _createAndOpenConnection()) + using(var cmd = new NpgsqlCommand(_scripts.InitializeStore, connection)) + { + if (ignoreErrors) + { + await ExecuteAndIgnoreErrors(() => cmd.ExecuteNonQueryAsync(cancellationToken)) + .NotOnCapturedContext(); + } + else + { + await cmd.ExecuteNonQueryAsync(cancellationToken) + .NotOnCapturedContext(); + } + } + } + + public async Task DropAll( + bool ignoreErrors = false, + CancellationToken cancellationToken = default(CancellationToken)) + { + using (var connection = await _createAndOpenConnection()) + using(var cmd = new NpgsqlCommand(_scripts.DropAll, connection)) + { + if (ignoreErrors) + { + await ExecuteAndIgnoreErrors(() => cmd.ExecuteNonQueryAsync(cancellationToken)) + .NotOnCapturedContext(); + } + else + { + await cmd.ExecuteNonQueryAsync(cancellationToken) + .NotOnCapturedContext(); + } + } + } + + private static async Task ExecuteAndIgnoreErrors(Func> operation) + { + try + { + return await operation().NotOnCapturedContext(); + } + catch + { + return default(T); + } + } + + private static StreamIdInfo HashStreamId(string streamId) + { + Ensure.That(streamId, "streamId").IsNotNullOrWhiteSpace(); + + Guid _; + if(Guid.TryParse(streamId, out _)) + { + return new StreamIdInfo(streamId, streamId); + } + + byte[] hashBytes = SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(streamId)); + var hashedStreamId = BitConverter.ToString(hashBytes).Replace("-", ""); + return new StreamIdInfo(hashedStreamId, streamId); + } + + private class StreamIdInfo + { + public readonly string StreamId; + public readonly string StreamIdOriginal; + + public StreamIdInfo(string streamId, string streamIdOriginal) + { + this.StreamId = streamId; + this.StreamIdOriginal = streamIdOriginal; + } + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/Properties/AssemblyInfo.cs b/src/Cedar.EventStore.Postgres/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..240c04cf7 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/Properties/AssemblyInfo.cs @@ -0,0 +1,4 @@ +using System.Reflection; + +[assembly: AssemblyTitle("Cedar.EventStore.Postgres")] +[assembly: AssemblyDescription("")] \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/BulkCopyEvents.sql b/src/Cedar.EventStore.Postgres/SqlScripts/BulkCopyEvents.sql new file mode 100644 index 000000000..f422b0592 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/BulkCopyEvents.sql @@ -0,0 +1,2 @@ +COPY $schema$.events (stream_id_internal, stream_version, id, created, type, json_data, json_metadata) +FROM STDIN BINARY \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/CreateStream.sql b/src/Cedar.EventStore.Postgres/SqlScripts/CreateStream.sql new file mode 100644 index 000000000..1eacf7cde --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/CreateStream.sql @@ -0,0 +1,3 @@ +INSERT INTO $schema$.streams(id, id_original) +VALUES (:stream_id, :stream_id_original) +RETURNING id_internal; \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/Dev.sql b/src/Cedar.EventStore.Postgres/SqlScripts/Dev.sql new file mode 100644 index 000000000..3160e0320 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/Dev.sql @@ -0,0 +1,306 @@ +DROP TABLE IF EXISTS events; +DROP TABLE IF EXISTS streams; +DROP TYPE IF EXISTS new_stream_events; +CREATE TABLE streams( + id text NOT NULL, + id_original text NOT NULL, + id_internal SERIAL PRIMARY KEY NOT NULL, + is_deleted boolean DEFAULT (false) NOT NULL +); +CREATE UNIQUE INDEX ix_streams_id ON streams USING btree(id); + +CREATE TABLE events( + stream_id_internal integer NOT NULL, + stream_version integer NOT NULL, + ordinal SERIAL PRIMARY KEY NOT NULL , + id uuid NOT NULL, + created timestamp NOT NULL, + type text NOT NULL, + json_data json NOT NULL, + json_metadata json , + CONSTRAINT fk_events_streams FOREIGN KEY (stream_id_internal) REFERENCES streams(id_internal) +); + +CREATE UNIQUE INDEX ix_events_stream_id_internal_revision ON events USING btree(stream_id_internal, stream_version); + +CREATE TYPE new_stream_events AS ( + stream_version integer, + id uuid, + created timestamp, + type text, + json_data json, + json_metadata json +); + +-- ExpectedVersion.NoStream + +new_events new_stream_events; +stream_id text; + +stream_id = 'stream-1'; + + +INSERT INTO @newEvents + ( + Id , + [Type] , + JsonData , + JsonMetadata + ) VALUES + ('00000000-0000-0000-0000-000000000001', 'type1', '\"data1\"', '\"meta1\"'), + ('00000000-0000-0000-0000-000000000002', 'type2', '\"data2\"', '\"meta2\"'), + ('00000000-0000-0000-0000-000000000003', 'type3', '\"data3\"', '\"meta3\"'), + ('00000000-0000-0000-0000-000000000004', 'type4', '\"data4\"', '\"meta4\"'); + +-- Actual SQL statement of interest +SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; +BEGIN TRANSACTION CreateStream; + DECLARE @count AS INT; + DECLARE @streamIdInternal AS INT; + BEGIN + INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamId); + SELECT @streamIdInternal = SCOPE_IDENTITY(); + + INSERT INTO dbo.Events (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) + SELECT @streamIdInternal, + StreamVersion, + Id, + Created, + [Type], + JsonData, + JsonMetadata + FROM @newEvents; + + END; + SELECT @streamIdInternal; +COMMIT TRANSACTION CreateStream; + + +SET @streamId = 'stream-2'; +SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; +BEGIN TRANSACTION CreateStream; + BEGIN + INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamId); + SELECT @streamIdInternal = SCOPE_IDENTITY(); + + INSERT INTO dbo.Events (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata) + SELECT @streamIdInternal, + StreamVersion, + Id, + Created, + [Type], + JsonData, + JsonMetadata + FROM @newEvents + + END; + SELECT @streamIdInternal; +COMMIT TRANSACTION CreateStream; + +SELECT * FROM dbo.Streams; +SELECT * FROM dbo.Events; + +DECLARE @pageNumber AS INT, @rowspPage AS INT; +SET @pageNumber = 2; +SET @rowspPage = 5; + +/* SQL Server 2012+ */ + SELECT Streams.IdOriginal As StreamId, + Events.StreamVersion, + Events.Ordinal, + Events.Id AS EventId, + Events.Created, + Events.Type, + Events.JsonData, + Events.JsonMetadata + FROM Events + INNER JOIN Streams + ON Events.StreamIdInternal=Streams.IdInternal + ORDER BY Events.Ordinal + OFFSET ((@pageNumber - 1) * @rowspPage) ROWS + FETCH NEXT @rowspPage ROWS ONLY; + + /* SQL Server 2000+ */ + SELECT Id As StreamId, + StreamVersion, + Ordinal, + EventId, + Created, + [Type], + JsonData, + JsonMetadata + FROM ( + SELECT ROW_NUMBER() OVER(ORDER BY Events.Ordinal) AS NUMBER, + Events.StreamIdInternal, + Events.StreamVersion, + Events.Ordinal, + Events.Id AS EventId, + Events.Created, + Events.Type, + Events.JsonData, + Events.JsonMetadata + FROM Events + ) AS PageTable + INNER JOIN Streams + ON StreamIdInternal=Streams.IdInternal + WHERE NUMBER BETWEEN ((@pageNumber - 1) * @RowspPage + 1) AND (@pageNumber * @rowspPage) + ORDER BY NUMBER; + +DECLARE @ordinal AS INT, @count1 AS INT; +SET @ordinal = 2; +SET @count1 = 5 + +/* SQL Server 2008+ */ + SELECT TOP(@count1) + Streams.IdOriginal As StreamId, + Events.StreamVersion, + Events.Ordinal, + Events.Id AS EventId, + Events.Created, + Events.Type, + Events.JsonData, + Events.JsonMetadata + FROM Events + INNER JOIN Streams + ON Events.StreamIdInternal=Streams.IdInternal + WHERE Events.Ordinal >= @ordinal + ORDER BY Events.Ordinal; + + /* SQL Server 2008+ */ + SELECT TOP(@count1) + Streams.IdOriginal As StreamId, + Events.StreamVersion, + Events.Ordinal, + Events.Id AS EventId, + Events.Created, + Events.Type, + Events.JsonData, + Events.JsonMetadata + FROM Events + INNER JOIN Streams + ON Events.StreamIdInternal=Streams.IdInternal + WHERE Events.Ordinal <= @ordinal + ORDER BY Events.Ordinal DESC; + +/* Delete Streeam*/ +BEGIN TRANSACTION DeleteStream + SELECT @streamIdInternal = Streams.IdInternal + FROM Streams + WHERE Streams.Id = @streamId; + + DELETE FROM Events + WHERE Events.StreamIdInternal = @streamIdInternal; + + UPDATE Streams + SET IsDeleted = '1' + WHERE Streams.Id = @streamId; +COMMIT TRANSACTION DeleteStream + +SELECT * FROM dbo.Streams; +SELECT * FROM dbo.Events; + +/* ReadStreamForward */ + +SET @count = 5; +SET @streamId = 'stream-1'; +DECLARE @StreamVersion AS INT = 0 +DECLARE @isDeleted AS BIT; + + SELECT @streamIdInternal = Streams.IdInternal, + @isDeleted = Streams.IsDeleted + FROM Streams + WHERE Streams.Id = @streamId + + SELECT @isDeleted AS IsDeleted + + SELECT TOP(@count) + Events.StreamVersion, + Events.Ordinal, + Events.Id AS EventId, + Events.Created, + Events.Type, + Events.JsonData, + Events.JsonMetadata + FROM Events + INNER JOIN Streams + ON Events.StreamIdInternal = Streams.IdInternal + WHERE Events.StreamIDInternal = @streamIDInternal AND Events.StreamVersion >= @StreamVersion + ORDER BY Events.Ordinal; + + SELECT TOP(1) + Events.StreamVersion + FROM Events + WHERE Events.StreamIDInternal = @streamIDInternal + ORDER BY Events.Ordinal DESC; + +/* ReadStreamBackward */ + +SET @StreamVersion = 5; + + SELECT @streamIdInternal = Streams.IdInternal, + @isDeleted = Streams.IsDeleted + FROM Streams + WHERE Streams.Id = @streamId + + SELECT @isDeleted; + + SELECT TOP(@count) + Streams.IdOriginal As StreamId, + Streams.IsDeleted as IsDeleted, + Events.StreamVersion, + Events.Ordinal, + Events.Id AS EventId, + Events.Created, + Events.Type, + Events.JsonData, + Events.JsonMetadata + FROM Events + INNER JOIN Streams + ON Events.StreamIdInternal = Streams.IdInternal + WHERE Events.StreamIDInternal = @streamIDInternal AND Events.StreamVersion <= @StreamVersion + ORDER BY Events.Ordinal DESC + + SELECT TOP(1) + Events.StreamVersion + FROM Events + WHERE Events.StreamIDInternal = @streamIDInternal + ORDER BY Events.Ordinal DESC; + +/* Delete Stream with expected version */ +SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; +BEGIN TRANSACTION DeleteStream + DECLARE @streamIdInternal2 AS INT; + DECLARE @expectedStreamVersion AS INT = 3; + DECLARE @latestStreamVersion AS INT; + SET @streamId = 'stream-1'; + + SELECT @streamIdInternal2 = Streams.IdInternal + FROM Streams + WHERE Streams.Id = @streamId; + + IF @streamIdInternal2 IS NULL + BEGIN + ROLLBACK TRANSACTION DeleteStream; + RAISERROR('WrongExpectedVersion', 12,1); + END + + SELECT TOP(1) + @latestStreamVersion = Events.StreamVersion + FROM Events + WHERE Events.StreamIDInternal = @streamIdInternal2 + ORDER BY Events.Ordinal DESC; + + IF @latestStreamVersion != @expectedStreamVersion + BEGIN + ROLLBACK TRANSACTION DeleteStream; + RAISERROR('WrongExpectedVersion', 12,2); + END + + UPDATE Streams + SET IsDeleted = '1' + WHERE Streams.Id = @streamId ; + + DELETE FROM Events + WHERE Events.StreamIdInternal = @streamIdInternal2; + +COMMIT TRANSACTION DeleteStream \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/DropAll.sql b/src/Cedar.EventStore.Postgres/SqlScripts/DropAll.sql new file mode 100644 index 000000000..dd097b90a --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/DropAll.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS $schema$.events; +DROP TABLE IF EXISTS $schema$.streams; +DROP TYPE IF EXISTS $schema$.new_stream_events; \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/GetStream.sql b/src/Cedar.EventStore.Postgres/SqlScripts/GetStream.sql new file mode 100644 index 000000000..0cd6f7ffc --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/GetStream.sql @@ -0,0 +1,9 @@ +SELECT streams.id_internal, + streams.is_deleted, + events.stream_version +FROM $schema$.streams +LEFT JOIN $schema$.events + ON events.stream_id_internal = streams.id_internal +WHERE streams.id = :stream_id +ORDER BY events.ordinal DESC +LIMIT 1; \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/InitializeStore.sql b/src/Cedar.EventStore.Postgres/SqlScripts/InitializeStore.sql new file mode 100644 index 000000000..266d9e321 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/InitializeStore.sql @@ -0,0 +1,276 @@ + + +CREATE SCHEMA IF NOT EXISTS $schema$; + +CREATE TABLE $schema$.streams( + id_internal SERIAL PRIMARY KEY, + id text NOT NULL, + id_original text NOT NULL, + is_deleted boolean DEFAULT (false) NOT NULL +); + +CREATE UNIQUE INDEX ix_streams_id +ON $schema$.streams +USING btree(id); + +CREATE TABLE $schema$.events( + stream_id_internal integer NOT NULL, + stream_version integer NOT NULL, + ordinal SERIAL PRIMARY KEY NOT NULL , + id uuid NOT NULL, + created timestamp NOT NULL, + type text NOT NULL, + json_data json NOT NULL, + json_metadata json , + CONSTRAINT fk_events_streams FOREIGN KEY (stream_id_internal) REFERENCES $schema$.streams(id_internal) +); + +CREATE UNIQUE INDEX ix_events_stream_id_internal_revision +ON $schema$.events +USING btree(stream_id_internal, stream_version DESC, ordinal DESC); + +CREATE OR REPLACE FUNCTION $schema$.create_stream(_stream_id text, _stream_id_original text) +RETURNS integer AS +$BODY$ +DECLARE + _result integer; +BEGIN + INSERT INTO $schema$.streams(id, id_original) + VALUES (_stream_id, _stream_id_original) + RETURNING id_internal + INTO _result; + + RETURN _result; +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION $schema$.get_stream(_stream_id text) +RETURNS TABLE(id_internal integer, is_deleted boolean, stream_version integer) AS +$BODY$ +BEGIN + RETURN QUERY + SELECT $schema$.streams.id_internal, + $schema$.streams.is_deleted, + (SELECT max($schema$.events.stream_version) from $schema$.events where $schema$.events.stream_id_internal = $schema$.streams.id_internal) + FROM $schema$.streams + WHERE $schema$.streams.id = _stream_id; +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION $schema$.read_all_forward(_ordinal bigint, _count integer) +RETURNS TABLE(stream_id integer, stream_version integer, ordinal bigint, event_id uuid, created timestamp, type text, json_data json, json_metadata json) AS +$BODY$ +BEGIN + RETURN QUERY + SELECT + $schema$.streams.id_original As stream_id, + $schema$.events.stream_version, + $schema$.events.ordinal, + $schema$.events.id AS event_id, + $schema$.events.created, + $schema$.events.type, + $schema$.events.json_data, + $schema$.events.json_metadata + FROM $schema$.events + INNER JOIN $schema$.streams + ON $schema$.events.stream_id_internal = $schema$.streams.id_internal + WHERE $schema$.events.ordinal >= _ordinal + ORDER BY $schema$.events.ordinal + LIMIT _count; +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION $schema$.read_all_backward(_ordinal bigint, _count integer) +RETURNS TABLE(stream_id integer, stream_version integer, ordinal bigint, event_id uuid, created timestamp, type text, json_data json, json_metadata json) AS +$BODY$ +BEGIN + RETURN QUERY + SELECT + $schema$.streams.id_original As stream_id, + $schema$.events.stream_version, + $schema$.events.ordinal, + $schema$.events.id AS event_id, + $schema$.events.created, + $schema$.events.type, + $schema$.events.json_data, + $schema$.events.json_metadata + FROM $schema$.events + INNER JOIN $schema$.streams + ON $schema$.events.stream_id_internal = $schema$.streams.id_internal + WHERE $schema$.events.ordinal <= _ordinal + ORDER BY $schema$.events.ordinal DESC + LIMIT _count; +END; +$BODY$ +LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION $schema$.read_stream_forward(_stream_id text, _count integer, _stream_version integer) RETURNS SETOF refcursor AS +$BODY$ +DECLARE + ref1 refcursor; + ref2 refcursor; + ref3 refcursor; + _stream_id_internal integer; + _is_deleted boolean; +BEGIN +SELECT $schema$.streams.id_internal, $schema$.streams.is_deleted + INTO _stream_id_internal, _is_deleted + FROM $schema$.streams + WHERE $schema$.streams.id = _stream_id; + +OPEN ref1 FOR + SELECT _stream_id_internal, _is_deleted; +RETURN NEXT ref1; + + +OPEN ref2 FOR + SELECT + $schema$.events.stream_version, + $schema$.events.ordinal, + $schema$.events.id AS event_id, + $schema$.events.created, + $schema$.events.type, + $schema$.events.json_data, + $schema$.events.json_metadata + FROM $schema$.events + INNER JOIN $schema$.streams + ON $schema$.events.stream_id_internal = $schema$.streams.id_internal + WHERE $schema$.events.stream_id_internal = _stream_id_internal + AND $schema$.events.stream_version >= _stream_version + ORDER BY $schema$.events.ordinal + LIMIT _count; +RETURN next ref2; + +OPEN ref3 FOR + SELECT $schema$.events.stream_version + FROM $schema$.events + WHERE $schema$.events.stream_id_internal = _stream_id_internal + ORDER BY $schema$.events.ordinal DESC + LIMIT 1; +RETURN next ref3; + +RETURN; +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION $schema$.read_stream_backward(_stream_id text, _count integer, _stream_version integer) RETURNS SETOF refcursor AS +$BODY$ +DECLARE + ref1 refcursor; + ref2 refcursor; + ref3 refcursor; + _stream_id_internal integer; + _is_deleted boolean; +BEGIN + +SELECT $schema$.streams.id_internal, $schema$.streams.is_deleted + INTO _stream_id_internal, _is_deleted + FROM $schema$.streams + WHERE $schema$.streams.id = _stream_id; + +OPEN ref1 FOR + SELECT _stream_id_internal, _is_deleted; +RETURN NEXT ref1; + + +OPEN ref2 FOR + SELECT + $schema$.events.stream_version, + $schema$.events.ordinal, + $schema$.events.id AS event_id, + $schema$.events.created, + $schema$.events.type, + $schema$.events.json_data, + $schema$.events.json_metadata + FROM $schema$.events + INNER JOIN $schema$.streams + ON $schema$.events.stream_id_internal = $schema$.streams.id_internal + WHERE $schema$.events.stream_id_internal = _stream_id_internal + AND $schema$.events.stream_version <= _stream_version + ORDER BY $schema$.events.ordinal DESC + LIMIT _count; +RETURN next ref2; + +OPEN ref3 FOR + SELECT $schema$.events.stream_version + FROM $schema$.events + WHERE $schema$.events.stream_id_internal = _stream_id_internal + ORDER BY $schema$.events.ordinal DESC + LIMIT 1; +RETURN next ref3; + +RETURN; +END; +$BODY$ +LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION $schema$.delete_stream_any_version(stream_id text) RETURNS VOID AS +$BODY$ +DECLARE + _stream_id_internal integer; +BEGIN + + SELECT $schema$.streams.id_internal + INTO _stream_id_internal + FROM $schema$.streams + WHERE $schema$.streams.id = stream_id; + + DELETE FROM $schema$.events + WHERE $schema$.events.stream_id_internal = _stream_id_internal; + + UPDATE $schema$.streams + SET is_deleted = true + WHERE $schema$.streams.id_internal = _stream_id_internal; + +RETURN; +END; +$BODY$ +LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION $schema$.delete_stream_expected_version(stream_id text, expected_version integer) RETURNS VOID AS +$BODY$ +DECLARE + _stream_id_internal integer; + _lastest_stream_version integer; +BEGIN + + SELECT $schema$.streams.id_internal + INTO _stream_id_internal + FROM $schema$.streams + WHERE $schema$.streams.id = stream_id; + + IF _stream_id_internal IS NULL THEN + RAISE EXCEPTION 'WrongExpectedVersion' + USING HINT = 'The Stream ' || stream_id || ' does not exist.'; + END IF; + + SELECT stream_version + INTO _lastest_stream_version + FROM $schema$.events + WHERE $schema$.events.stream_id_internal = _stream_id_internal + ORDER BY $schema$.events.ordinal DESC + LIMIT 1; + + IF (_lastest_stream_version <> expected_version) THEN + RAISE EXCEPTION 'WrongExpectedVersion' + USING HINT = 'The Stream ' || stream_id || 'version was expected to be' || expected_version::text || ' but was version ' || _lastest_stream_version::text || '.' ; + END IF; + + DELETE FROM $schema$.events + WHERE $schema$.events.stream_id_internal = _stream_id_internal; + + UPDATE $schema$.streams + SET is_deleted = true + WHERE $schema$.streams.id_internal = _stream_id_internal; + +RETURN; +END; +$BODY$ +LANGUAGE plpgsql; \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllBackward.sql b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllBackward.sql new file mode 100644 index 000000000..1fc9c7507 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllBackward.sql @@ -0,0 +1,15 @@ +SELECT + streams.id_original As stream_id, + events.stream_version, + events.ordinal, + events.id AS event_id, + events.created, + events.type, + events.json_data, + events.json_metadata + FROM $schema$.events + INNER JOIN $schema$.streams + ON events.stream_id_internal = streams.id_internal + WHERE events.ordinal <= :ordinal + ORDER BY events.ordinal DESC +LIMIT :count \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllForward.sql b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllForward.sql new file mode 100644 index 000000000..1a03964a6 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/ReadAllForward.sql @@ -0,0 +1,15 @@ +SELECT + streams.id_original As stream_id, + events.stream_version, + events.ordinal, + events.id AS event_id, + events.created, + events.type, + events.json_data, + events.json_metadata + FROM $schema$.events + INNER JOIN $schema$.streams + ON events.stream_id_internal = streams.id_internal + WHERE events.ordinal >= :ordinal + ORDER BY events.ordinal +LIMIT :count \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/SqlScripts/Scripts.cs b/src/Cedar.EventStore.Postgres/SqlScripts/Scripts.cs new file mode 100644 index 000000000..8a77f6734 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/SqlScripts/Scripts.cs @@ -0,0 +1,118 @@ +namespace Cedar.EventStore.Postgres.SqlScripts +{ + using System; + using System.Collections.Concurrent; + using System.IO; + + public class Scripts + { + private static readonly ConcurrentDictionary s_scripts + = new ConcurrentDictionary(); + + private readonly string _schema; + + public Scripts(string schema = "public") + { + _schema = schema; + Functions = new GetFunctions(_schema); + } + + public string InitializeStore + { + get { return GetScript("InitializeStore").Replace("$schema$", _schema); } + } + + public string DropAll + { + get { return GetScript("DropAll").Replace("$schema$", _schema); } + } + + public string BulkCopyEvents + { + get { return GetScript("BulkCopyEvents").Replace("$schema$", _schema); } + } + + public string ReadAllForward + { + get { return GetScript("ReadAllForward").Replace("$schema$", _schema); } + } + + public string ReadAllBackward + { + get { return GetScript("ReadAllBackward").Replace("$schema$", _schema); } + } + + public GetFunctions Functions { get; private set; } + + private string GetScript(string name) + { + return s_scripts.GetOrAdd(name, + key => + { + using(Stream stream = typeof(Scripts) + .Assembly + .GetManifestResourceStream("Cedar.EventStore.Postgres.SqlScripts." + key + ".sql")) + { + if(stream == null) + { + throw new Exception("Embedded resource not found. BUG!"); + } + using(StreamReader reader = new StreamReader(stream)) + { + return reader.ReadToEnd(); + } + } + }); + } + + public class GetFunctions + { + private readonly string _schema; + + public GetFunctions(string schema) + { + _schema = schema; + } + + public string CreateStream + { + get { return string.Concat(_schema, ".", "create_stream"); } + } + + public string GetStream + { + get { return string.Concat(_schema, ".", "get_stream"); } + } + + public string ReadAllForward + { + get { return string.Concat(_schema, ".", "read_all_forward"); } + } + + public string ReadAllBackward + { + get { return string.Concat(_schema, ".", "read_all_backward"); } + } + + public string ReadStreamForward + { + get { return string.Concat(_schema, ".", "read_stream_forward"); } + } + + public string ReadStreamBackward + { + get { return string.Concat(_schema, ".", "read_stream_backward"); } + } + + public string DeleteStreamAnyVersion + { + get { return string.Concat(_schema, ".", "delete_stream_any_version"); } + } + + public string DeleteStreamExpectedVersion + { + get { return string.Concat(_schema, ".", "delete_stream_expected_version"); } + } + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.Postgres/packages.config b/src/Cedar.EventStore.Postgres/packages.config new file mode 100644 index 000000000..9d1a043b2 --- /dev/null +++ b/src/Cedar.EventStore.Postgres/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs index 0f5368a7c..4a3d73350 100644 --- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs +++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.AppendStream.cs @@ -97,5 +97,21 @@ await eventStore } } } + + [Fact] + public async Task When_append_stream_with_expected_version_any_and_no_stream_exists_should_not_throw() + { + // Idempotency + using (var fixture = GetFixture()) + { + using (var eventStore = await fixture.GetEventStore()) + { + const string streamId = "stream-1"; + await eventStore + .AppendToStream(streamId, ExpectedVersion.Any, CreateNewStreamEvents(1, 2, 3)) + .ShouldNotThrow(); + } + } + } } } diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs index 18f926494..f61fb83f0 100644 --- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs +++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.DeleteStream.cs @@ -11,15 +11,15 @@ public abstract partial class EventStoreAcceptanceTests [Fact] public async Task When_delete_existing_stream_with_no_expected_version_then_should_be_deleted() { - using(var fixture = GetFixture()) + using (var fixture = GetFixture()) { - using(var eventStore = await fixture.GetEventStore()) + using (var eventStore = await fixture.GetEventStore()) { const string streamId = "stream"; var events = new[] { - new NewStreamEvent(Guid.NewGuid(), "data", "meta"), - new NewStreamEvent(Guid.NewGuid(), "data", "meta") + new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\""), + new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\"") }; await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, events); @@ -40,7 +40,7 @@ public async Task When_delete_stream_that_does_not_exist() { using (var eventStore = await fixture.GetEventStore()) { - const string streamId = "stream"; + const string streamId = "notexist"; Func act = () => eventStore.DeleteStream(streamId); act.ShouldNotThrow(); @@ -58,8 +58,8 @@ public async Task When_delete_stream_with_a_matching_expected_version_then_shoul const string streamId = "stream"; var events = new[] { - new NewStreamEvent(Guid.NewGuid(), "data", "meta"), - new NewStreamEvent(Guid.NewGuid(), "data", "meta") + new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\""), + new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\"") }; await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, events); @@ -81,6 +81,7 @@ public async Task When_delete_a_stream_and_append_then_should_throw() using (var eventStore = await fixture.GetEventStore()) { const string streamId = "stream"; + await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, CreateNewStreamEvents(1)); await eventStore.DeleteStream(streamId); @@ -118,8 +119,8 @@ public async Task When_delete_stream_with_a_non_matching_expected_version_then_s const string streamId = "stream"; var events = new[] { - new NewStreamEvent(Guid.NewGuid(), "data", "meta"), - new NewStreamEvent(Guid.NewGuid(), "data", "meta") + new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\""), + new NewStreamEvent(Guid.NewGuid(), "type", "\"data\"", "\"headers\"") }; await eventStore.AppendToStream(streamId, ExpectedVersion.NoStream, events); diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs index c112fb865..bfd5a2dfb 100644 --- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs +++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadAll.cs @@ -1,5 +1,6 @@ namespace Cedar.EventStore { + using System; using System.Collections.Generic; using System.Threading.Tasks; using FluentAssertions; @@ -52,6 +53,100 @@ public async Task Can_read_all_forwards() } } + [Fact] + public async Task Read_forwards_to_the_end_should_return_a_valid_Checkpoint() + { + using (var fixture = GetFixture()) + { + using (var eventStore = await fixture.GetEventStore()) + { + await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3, 4, 5, 6)); + + // read to the end of the stream + var allEventsPage = await eventStore.ReadAll(Checkpoint.Start, 4); + + int count = 0; //counter is used to short circuit bad implementations that never return IsEnd = true + + while (!allEventsPage.IsEnd && count < 20) + { + allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10); + count++; + } + + allEventsPage.IsEnd.Should().BeTrue(); + + Checkpoint currentCheckpoint = allEventsPage.NextCheckpoint; + currentCheckpoint.Should().NotBeNull(); + + // read end of stream again, should be empty, should return same checkpoint + allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10); + count = 0; + while (!allEventsPage.IsEnd && count < 20) + { + allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10); + count++; + } + + allEventsPage.StreamEvents.Should().BeEmpty(); + allEventsPage.IsEnd.Should().BeTrue(); + allEventsPage.NextCheckpoint.Should().NotBeNull(); + + currentCheckpoint = allEventsPage.NextCheckpoint; + + // append some events then read again from the saved checkpoint, the next checkpoint should have moved + await eventStore.AppendToStream("stream-1", ExpectedVersion.Any, CreateNewStreamEvents(7, 8, 9)); + + allEventsPage = await eventStore.ReadAll(currentCheckpoint, 10); + count = 0; + while (!allEventsPage.IsEnd && count < 20) + { + allEventsPage = await eventStore.ReadAll(allEventsPage.NextCheckpoint, 10); + count++; + } + + allEventsPage.IsEnd.Should().BeTrue(); + allEventsPage.NextCheckpoint.Should().NotBeNull(); + allEventsPage.NextCheckpoint.Should().NotBe(currentCheckpoint.Value); + } + } + } + + [Fact] + public async Task When_read_past_end_of_all() + { + using(var fixture = GetFixture()) + { + using(var eventStore = await fixture.GetEventStore()) + { + await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3)); + + bool isEnd = false; + int count = 0; + Checkpoint checkpoint = Checkpoint.Start; + while (!isEnd) + { + _testOutputHelper.WriteLine($"Loop {count}"); + + var streamEventsPage = await eventStore.ReadAll(checkpoint, 10); + _testOutputHelper.WriteLine($"FromCheckpoint = {streamEventsPage.FromCheckpoint}"); + _testOutputHelper.WriteLine($"NextCheckpoint = {streamEventsPage.NextCheckpoint}"); + _testOutputHelper.WriteLine($"IsEnd = {streamEventsPage.IsEnd}"); + _testOutputHelper.WriteLine($"StreamEvents.Count = {streamEventsPage.StreamEvents.Count}"); + _testOutputHelper.WriteLine(""); + + checkpoint = streamEventsPage.NextCheckpoint; + isEnd = streamEventsPage.IsEnd; + count++; + + if(count > 100) + { + throw new Exception("omg wtf"); + } + } + } + } + } + [Fact] public async Task Can_read_all_backward() { diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs index 8cdde47aa..c68bc733c 100644 --- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs +++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.ReadStream.cs @@ -46,6 +46,42 @@ public async Task Can_read_streams_forwards_and_backwards(ReadStreamTheory theor } } + [Theory] + [InlineData(ReadDirection.Forward, 0, 10)] + [InlineData(ReadDirection.Backward, StreamPosition.End, 10)] + public async Task Empty_Streams_return_StreamNotFound(ReadDirection direction, int start, int pageSize) + { + using(var fixture = GetFixture()) + { + using(var eventStore = await fixture.GetEventStore()) + { + var streamEventsPage = + await eventStore.ReadStream("stream-does-not-exist", start, pageSize, direction); + + streamEventsPage.Status.Should().Be(PageReadStatus.StreamNotFound); + } + } + } + + [Theory] + [InlineData(ReadDirection.Forward, 0, 10)] + [InlineData(ReadDirection.Backward, StreamPosition.End, 10)] + public async Task Deleted_Streams_return_StreamDeleted(ReadDirection direction, int start, int pageSize) + { + using (var fixture = GetFixture()) + { + using (var eventStore = await fixture.GetEventStore()) + { + await eventStore.AppendToStream("stream-1", ExpectedVersion.NoStream, CreateNewStreamEvents(1, 2, 3)); + await eventStore.DeleteStream("stream-1", ExpectedVersion.Any); + + var streamEventsPage = + await eventStore.ReadStream("stream-1", start, pageSize, direction); + + streamEventsPage.Status.Should().Be(PageReadStatus.StreamDeleted); + } + } + } public static IEnumerable GetReadStreamTheories() { var theories = new[] diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.cs b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.cs index 12bbd42ce..6dc6b1404 100644 --- a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.cs +++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.cs @@ -44,25 +44,25 @@ public async Task Can_dispose_more_than_once() act.ShouldNotThrow(); } } - private static NewStreamEvent[] CreateNewStreamEvents(params int[] eventNumbers) + public static NewStreamEvent[] CreateNewStreamEvents(params int[] eventNumbers) { return eventNumbers .Select(eventNumber => { var eventId = Guid.Parse("00000000-0000-0000-0000-" + eventNumber.ToString().PadLeft(12, '0')); - return new NewStreamEvent(eventId, "type", "data", "metadata"); + return new NewStreamEvent(eventId, "type", "\"data\"", "\"metadata\""); }) .ToArray(); } - private static StreamEvent ExpectedStreamEvent( + public static StreamEvent ExpectedStreamEvent( string streamId, int eventNumber, int sequenceNumber, DateTime created) { var eventId = Guid.Parse("00000000-0000-0000-0000-" + eventNumber.ToString().PadLeft(12, '0')); - return new StreamEvent(streamId, eventId, sequenceNumber, null, created, "type", "data", "metadata"); + return new StreamEvent(streamId, eventId, sequenceNumber, null, created, "type", "\"data\"", "\"metadata\""); } } } \ No newline at end of file diff --git a/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.cs.orig b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.cs.orig new file mode 100644 index 000000000..eeac70c16 --- /dev/null +++ b/src/Cedar.EventStore.Tests/EventStoreAcceptanceTests.cs.orig @@ -0,0 +1,92 @@ +namespace Cedar.EventStore +{ + using System; + using System.Linq; + using System.Threading.Tasks; + using FluentAssertions; + using Xunit; + using Xunit.Abstractions; + + public abstract partial class EventStoreAcceptanceTests + { + private readonly ITestOutputHelper _testOutputHelper; + protected abstract EventStoreAcceptanceTestFixture GetFixture(); + + protected EventStoreAcceptanceTests(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + +<<<<<<< HEAD + private static NewStreamEvent[] CreateNewStreamEvents(params int[] eventNumbers) + { + return eventNumbers + .Select(eventNumber => + { + var eventId = Guid.Parse("00000000-0000-0000-0000-" + eventNumber.ToString().PadLeft(12, '0')); + return new NewStreamEvent(eventId, "type", "\"data\"", "\"headers\""); + }) + .ToArray(); + } + + private static StreamEvent ExpectedStreamEvent( + string streamId, + int eventNumber, + int sequenceNumber, + DateTime created) + { + var eventId = Guid.Parse("00000000-0000-0000-0000-" + eventNumber.ToString().PadLeft(12, '0')); + return new StreamEvent(streamId, eventId, sequenceNumber, null, created, "type", "\"data\"", "\"headers\""); + } + +======= +>>>>>>> upstream/master + [Fact] + public async Task When_dispose_and_read_then_should_throw() + { + using(var fixture = GetFixture()) + { + var store = await fixture.GetEventStore(); + store.Dispose(); + + Func act = () => store.ReadAll(Checkpoint.Start, 10); + + act.ShouldThrow(); + } + } + + [Fact] + public async Task Can_dispose_more_than_once() + { + using (var fixture = GetFixture()) + { + var store = await fixture.GetEventStore(); + store.Dispose(); + + Action act = store.Dispose; + + act.ShouldNotThrow(); + } + } + private static NewStreamEvent[] CreateNewStreamEvents(params int[] eventNumbers) + { + return eventNumbers + .Select(eventNumber => + { + var eventId = Guid.Parse("00000000-0000-0000-0000-" + eventNumber.ToString().PadLeft(12, '0')); + return new NewStreamEvent(eventId, "type", "data", "metadata"); + }) + .ToArray(); + } + + private static StreamEvent ExpectedStreamEvent( + string streamId, + int eventNumber, + int sequenceNumber, + DateTime created) + { + var eventId = Guid.Parse("00000000-0000-0000-0000-" + eventNumber.ToString().PadLeft(12, '0')); + return new StreamEvent(streamId, eventId, sequenceNumber, null, created, "type", "data", "metadata"); + } + } +} \ No newline at end of file diff --git a/src/Cedar.EventStore.sln b/src/Cedar.EventStore.sln index 9d5d44b36..082a04fc6 100644 --- a/src/Cedar.EventStore.sln +++ b/src/Cedar.EventStore.sln @@ -18,6 +18,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cedar.EventStore.MsSql2008" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cedar.EventStore.MsSql2008.Tests", "Cedar.EventStore.MsSql2008.Tests\Cedar.EventStore.MsSql2008.Tests.csproj", "{97AA016B-0B9F-44C2-8228-A13B4E251FB0}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cedar.EventStore.Postgres", "Cedar.EventStore.Postgres\Cedar.EventStore.Postgres.csproj", "{148C90E9-0EA1-482E-94A9-F178294EFAC2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cedar.EventStore.Postgres.Tests", "Cedar.EventStore.Postgres.Tests\Cedar.EventStore.Postgres.Tests.csproj", "{453FB5DB-99DC-42D3-9DFE-F81EDF98F5E3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -56,6 +60,14 @@ Global {97AA016B-0B9F-44C2-8228-A13B4E251FB0}.Debug|Any CPU.Build.0 = Debug|Any CPU {97AA016B-0B9F-44C2-8228-A13B4E251FB0}.Release|Any CPU.ActiveCfg = Release|Any CPU {97AA016B-0B9F-44C2-8228-A13B4E251FB0}.Release|Any CPU.Build.0 = Release|Any CPU + {148C90E9-0EA1-482E-94A9-F178294EFAC2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {148C90E9-0EA1-482E-94A9-F178294EFAC2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {148C90E9-0EA1-482E-94A9-F178294EFAC2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {148C90E9-0EA1-482E-94A9-F178294EFAC2}.Release|Any CPU.Build.0 = Release|Any CPU + {453FB5DB-99DC-42D3-9DFE-F81EDF98F5E3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {453FB5DB-99DC-42D3-9DFE-F81EDF98F5E3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {453FB5DB-99DC-42D3-9DFE-F81EDF98F5E3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {453FB5DB-99DC-42D3-9DFE-F81EDF98F5E3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Cedar.EventStore.sln.GhostDoc.xml b/src/Cedar.EventStore.sln.GhostDoc.xml new file mode 100644 index 000000000..0c987e9b2 --- /dev/null +++ b/src/Cedar.EventStore.sln.GhostDoc.xml @@ -0,0 +1,6 @@ + + + *.min.js + jquery*.js + + diff --git a/src/nuget.config b/src/nuget.config new file mode 100644 index 000000000..183217403 --- /dev/null +++ b/src/nuget.config @@ -0,0 +1,3 @@ + + + \ No newline at end of file