8000 Allow multiple concurrent filter streams · leoniDEV/libgit2sharp@1ba31db · GitHub
[go: up one dir, main page]

Skip to content

Commit 1ba31db

Browse files
committed
Allow multiple concurrent filter streams
The current implementation of filters in libgit2sharp only allows for one filter stream per type of filter at any given point in time. This switches the storage of the necessary state from instance variables on the Filter class to a dedicated state object which is tracked in a dictionary keyed on the libgit2 stream pointer.
1 parent 45b321b commit 1ba31db

File tree

1 file changed

+87
-35
lines changed

1 file changed

+87
-35
lines changed

LibGit2Sharp/Filter.cs

Lines changed: 87 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
4+
using System.Diagnostics;
5+
using System.Globalization;
36
using System.IO;
47
using System.Linq;
58
using System.Runtime.InteropServices;
@@ -47,18 +50,34 @@ protected Filter(string name, IEnumerable<FilterAttributeEntry> attributes)
4750
~Filter()
4851
{
4952
GlobalSettings.DeregisterFilter(this);
53+
54+
#if LEAKS_IDENTIFYING
55+
int activeStreamCount = activeStreams.Count;
56+
if (activeStreamCount > 0)
57+
{
58+
Trace.WriteLine(string.Format(CultureInfo.InvariantCulture, "{0} leaked {1} stream handles at finalization", GetType().Name, activeStreamCount));
59+
}
60+
#endif
5061
}
5162

5263
private readonly string name;
5364
private readonly IEnumerable<FilterAttributeEntry> attributes;
5465
private readonly GitFilter gitFilter;
66+
private readonly ConcurrentDictionary<IntPtr, StreamState> activeStreams = new ConcurrentDictionary<IntPtr, StreamState>();
5567

56-
private GitWriteStream thisStream;
57-
private GitWriteStream nextStream;
58-
private IntPtr thisPtr;
59-
private IntPtr nextPtr;
60-
private FilterSource filterSource;
61-
private Stream output;
68+
/// <summary>
69+
/// State bag used to keep necessary reference from being
70+
/// garbage collected during filter processing.
71+
/// </summary>
72+
private class StreamState
73+
{
74+
public GitWriteStream thisStream;
75+
public GitWriteStream nextStream;
76+
public IntPtr thisPtr;
77+
public IntPtr nextPtr;
78+
public FilterSource filterSource;
79+
public Stream output;
80+
}
6281

6382
/// <summary>
6483
/// The name that this filter was registered with
@@ -226,33 +245,44 @@ int InitializeCallback(IntPtr filterPointer)
226245
int StreamCreateCallback(out IntPtr git_writestream_out, GitFilter self, IntPtr payload, IntPtr filterSourcePtr, IntPtr git_writestream_next)
227246
{
228247
int result = 0;
248+
var state = new StreamState();
229249

230250
try
231251
{
232252
Ensure.ArgumentNotZeroIntPtr(filterSourcePtr, "filterSourcePtr");
233253
Ensure.ArgumentNotZeroIntPtr(git_writestream_next, "git_writestream_next");
234254

235-
thisStream = new GitWriteStream();
236-
thisStream.close = StreamCloseCallback;
237-
thisStream.write = StreamWriteCallback;
238-
thisStream.free = StreamFreeCallback;
239-
thisPtr = Marshal.AllocHGlobal(Marshal.SizeOf(thisStream));
240-
Marshal.StructureToPtr(thisStream, thisPtr, false);
241-
nextPtr = git_writestream_next;
242-
nextStream = new GitWriteStream();
243-
Marshal.PtrToStructure(nextPtr, nextStream);
244-
filterSource = FilterSource.FromNa B2E0 tivePtr(filterSourcePtr);
245-
output = new WriteStream(nextStream, nextPtr);
246-
247-
Create(filterSource.Path, filterSource.Root, filterSource.SourceMode);
255+
state.thisStream = new GitWriteStream();
256+
state.thisStream.close = StreamCloseCallback;
257+
state.thisStream.write = StreamWriteCallback;
258+
state.thisStream.free = StreamFreeCallback;
259+
260+
state.thisPtr = Marshal.AllocHGlobal(Marshal.SizeOf(state.thisStream));
261+
Marshal.StructureToPtr(state.thisStream, state.thisPtr, false);
262+
263+
state.nextPtr = git_writestream_next;
264+
state.nextStream = new GitWriteStream();
265+
Marshal.PtrToStructure(state.nextPtr, state.nextStream);
266+
267+
state.filterSource = FilterSource.FromNativePtr(filterSourcePtr);
268+
state.output = new WriteStream(state.nextStream, state.nextPtr);
269+
270+
Create(state.filterSource.Path, state.filterSource.Root, state.filterSource.SourceMode);
271+
272+
if (!activeStreams.TryAdd(state.thisPtr, state))
273+
{
274+
// AFAICT this is a theoretical error that could only happen if we manage
275+
// to free the stream pointer but fail to remove the dictionary entry.
276+
throw new InvalidOperationException("Overlapping stream pointers");
277+
}
248278
}
249279
catch (Exception exception)
250280
{
251281
// unexpected failures means memory clean up required
252-
if (thisPtr != IntPtr.Zero)
282+
if (state.thisPtr != IntPtr.Zero)
253283
{
254-
Marshal.FreeHGlobal(thisPtr);
255-
thisPtr = IntPtr.Zero;
284+
Marshal.FreeHGlobal(state.thisPtr);
285+
state.thisPtr = IntPtr.Zero;
256286
}
257287

258288
Log.Write(LogLevel.Error, "Filter.StreamCreateCallback exception");
@@ -261,24 +291,33 @@ int StreamCreateCallback(out IntPtr git_writestream_out, GitFilter self, IntPtr
261291
result = (int)GitErrorCode.Error;
262292
}
263293

264-
git_writestream_out = thisPtr;
294+
git_writestream_out = state.thisPtr;
265295

266296
return result;
267297
}
268298

269299
int StreamCloseCallback(IntPtr stream)
270300
{
271301
int result = 0;
302+
StreamState state;
272303

273304
try
274305
{
275306
Ensure.ArgumentNotZeroIntPtr(stream, "stream");
276-
Ensure.ArgumentIsExpectedIntPtr(stream, thisPtr, "stream");
277307

278-
using (BufferedStream outputBuffer = new BufferedStream(output, BufferSize))
308+
if(!activeStreams.TryGetValue(stream, out state))
309+
{
310+
throw new ArgumentException("Unknown stream pointer", "stream");
311+
}
312+
313+
Ensure.ArgumentIsExpectedIntPtr(stream, state.thisPtr, "stream");
314+
315+
using (BufferedStream outputBuffer = new BufferedStream(state.output, BufferSize))
279316
{
280-
Complete(filterSource.Path, filterSource.Root, outputBuffer);
317+
Complete(state.filterSource.Path, state.filterSource.Root, outputBuffer);
281318
}
319+
320+
result = state.nextStream.close(state.nextPtr);
282321
}
283322
catch (Exception exception)
284323
{
@@ -288,19 +327,25 @@ int StreamCloseCallback(IntPtr stream)
288327
result = (int)GitErrorCode.Error;
289328
}
290329

291-
result = nextStream.close(nextPtr);
292-
293330
return result;
294331
}
295332

296333
void StreamFreeCallback(IntPtr stream)
297334
{
335+
StreamState state;
336+
298337
try
299338
{
300339
Ensure.ArgumentNotZeroIntPtr(stream, "stream");
301-
Ensure.ArgumentIsExpectedIntPtr(stream, thisPtr, "stream");
302340

303-
Marshal.FreeHGlobal(thisPtr);
341+
if (!activeStreams.TryRemove(stream, out state))
342+
{
343+
throw new ArgumentException("Double free or invalid stream pointer", "stream");
344+
}
345+
346+
Ensure.ArgumentIsExpectedIntPtr(stream, state.thisPtr, "stream");
347+
348+
Marshal.FreeHGlobal(state.thisPtr);
304349
}
305350
catch (Exception exception)
306351
{
@@ -312,24 +357,31 @@ void StreamFreeCallback(IntPtr stream)
312357
unsafe int StreamWriteCallback(IntPtr stream, IntPtr buffer, UIntPtr len)
313358
{
314359
int result = 0;
360+
StreamState state;
315361

316362
try
317363
{
318364
Ensure.ArgumentNotZeroIntPtr(stream, "stream");
319365
Ensure.ArgumentNotZeroIntPtr(buffer, "buffer");
320-
Ensure.ArgumentIsExpectedIntPtr(stream, thisPtr, "stream");
366+
367+
if (!activeStreams.TryGetValue(stream, out state))
368+
{
369+
throw new ArgumentException("Invalid or already freed stream pointer", "stream");
370+
}
371+
372+
Ensure.ArgumentIsExpectedIntPtr(stream, state.thisPtr, "stream");
321373

322374
using (UnmanagedMemoryStream input = new UnmanagedMemoryStream((byte*)buffer.ToPointer(), (long)len))
323-
using (BufferedStream outputBuffer = new BufferedStream(output, BufferSize))
375+
using (BufferedStream outputBuffer = new BufferedStream(state.output, BufferSize))
324376
{
325-
switch (filterSource.SourceMode)
377+
switch (state.filterSource.SourceMode)
326378
{
327379
case FilterMode.Clean:
328-
Clean(filterSource.Path, filterSource.Root, input, outputBuffer);
380+
Clean(state.filterSource.Path, state.filterSource.Root, input, outputBuffer);
329381
break;
330382

331383
case FilterMode.Smudge:
332-
Smudge(filterSource.Path, filterSource.Root, input, outputBuffer);
384+
Smudge(state.filterSource.Path, state.filterSource.Root, input, outputBuffer);
333385
break;
334386

335387
default:

0 commit comments

Comments
 (0)
0