8000 fix: return socket after session disposed · cnblogs/dashscope-sdk@f3f83de · GitHub
[go: up one dir, main page]

Skip to content

Commit f3f83de

Browse files
committed
fix: return socket after session disposed
1 parent 5139740 commit f3f83de

File tree

10 files changed

+78
-41
lines changed

10 files changed

+78
-41
lines changed

sample/Cnblogs.DashScope.Sample/Program.cs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
using Microsoft.Extensions.AI;
1010

1111
Console.WriteLine("Reading key from environment variable DASHSCOPE_KEY");
12-
var apiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY");
12+
var apiKey = Environment.GetEnvironmentVariable("DASHSCOPE_KEY", EnvironmentVariableTarget.Process)
13+
?? Environment.GetEnvironmentVariable("DASHSCOPE_KEY", EnvironmentVariableTarget.User);
1314
if (string.IsNullOrEmpty(apiKey))
1415
{
1516
Console.Write("ApiKey > ");
@@ -64,23 +65,24 @@
6465
await ApplicationCallAsync(applicationId, userInput);
6566
break;
6667
case SampleType.TextToSpeech:
67-
var tts = await dashScopeClient.CreateSpeechSynthesizerSocketSessionAsync("cosyvoice-v2");
68-
var taskId = await tts.RunTaskAsync(
69-
new SpeechSynthesizerParameters() { Voice = "longxiaochun_v2", Format = "mp3" });
70-
await tts.ContinueTaskAsync(taskId, "博客园");
71-
await tts.ContinueTaskAsync(taskId, "代码改变世界");
72-
await tts.FinishTaskAsync(taskId);
73-
var file = new FileInfo("tts.mp3");
74-
var writer = file.OpenWrite();
75-
await foreach (var b in tts.GetAudioAsync())
7668
{
77-
writer.WriteByte(b);
78-
}
79-
80-
writer.Close();
69+
using var tts = await dashScopeClient.CreateSpeechSynthesizerSocketSessionAsync("cosyvoice-v2");
70+
var taskId = await tts.RunTaskAsync(
71+
new SpeechSynthesizerParameters() { Voice = "longxiaochun_v2", Format = "mp3" });
72+
await tts.ContinueTaskAsync(taskId, "博客园");
73+
await tts.ContinueTaskAsync(taskId, "代码改变世界");
74+
await tts.FinishTaskAsync(taskId);
75+
var file = new FileInfo("tts.mp3");
76+
var writer = file.OpenWrite();
77+
await foreach (var b in tts.GetAudioAsync())
78+
{
79+
writer.WriteByte(b);
80+
}
8181

82-
Console.WriteLine($"audio saved to {file.FullName}");
83-
break;
82+
writer.Close();
83+
Console.WriteLine($"audio saved to {file.FullName}");
84+
break;
85+
}
8486
}
8587

8688
return;

src/Cnblogs.DashScope.AspNetCore/Cnblogs.DashScope.AspNetCore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<PropertyGroup>
44
<Product>Cnblogs.DashScopeSDK</Product>
55
<GenerateDocumentationFile>true</GenerateDocumentationFile>
6-
<PackageTags>Cnblogs;Dashscope;AI;Sdk;Embedding;AspNetCore</PackageTags>
6+
<PackageTags>Cnblogs;Dashscope;AI;Sdk;Embedding;AspNetCore;Bailian</PackageTags>
77
<RootNamespace>Cnblogs.DashScope.AspNetCore</RootNamespace>
88
</PropertyGroup>
99

src/Cnblogs.DashScope.Core/Cnblogs.DashScope.Core.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<PropertyGroup>
44
<Product>Cnblogs.DashScopeSDK</Product>
55
<GenerateDocumentationFile>true</GenerateDocumentationFile>
6-
<PackageTags>Cnblogs;Dashscope;AI;Sdk;Embedding;</PackageTags>
6+
<PackageTags>Cnblogs;Dashscope;AI;Sdk;Embedding;Bailian;</PackageTags>
77
<Description>Provide pure api access to DashScope without extra references. Cnblogs.DashScope.Sdk should be used for general purpose.</Description>
88
</PropertyGroup>
99

@@ -15,5 +15,5 @@
1515
<PackageReference Include="Microsoft.DeepDev.TokenizerLib" Version="1.3.3" />
1616
<PackageReference Include="System.Text.Json" Version="8.0.5" />
1717
</ItemGroup>
18-
18+
1919
</Project>

src/Cnblogs.DashScope.Core/DashScopeClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public DashScopeClient(
3030
string baseAddress = DashScopeDefaults.HttpApiBaseAddress,
3131
string baseWebsocketAddress = DashScopeDefaults.WebsocketApiBaseAddress,
3232
string? workspaceId = null,
33-
int socketPoolSize = 5)
33+
int socketPoolSize = 32)
3434
: base(
3535
GetConfiguredClient(apiKey, timeout, baseAddress, workspaceId),
3636
GetConfiguredSocketPool(apiKey, baseWebsocketAddress, socketPoolSize, workspaceId))
@@ -40,7 +40,7 @@ public DashScopeClient(
4040
private static DashScopeClientWebSocketPool GetConfiguredSocketPool(
4141
string apiKey,
4242
string baseAddress,
43-
int socketPoolSize = 5,
43+
int socketPoolSize,
4444
string? workspaceId = null)
4545
{
4646
var key = GetCacheKey();

src/Cnblogs.DashScope.Core/DashScopeClientWebSocket.cs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ namespace Cnblogs.DashScope.Core;
1111
/// </summary>
1212
public sealed class DashScopeClientWebSocket : IDisposable
1313
{
14-
private static readonly UnboundedChannelOptions UnboundedChannelOptions = new()
15-
{
16-
SingleWriter = true,
17-
};
14+
priva 8000 te static readonly UnboundedChannelOptions UnboundedChannelOptions =
15+
new()
16+
{
17+
SingleWriter = true,
18+
SingleReader = true,
19+
AllowSynchronousContinuations = true
20+
};
1821

1922
private readonly ClientWebSocket _socket = new();
2023
private Task? _receiveTask;
@@ -63,14 +66,15 @@ public async Task ConnectAsync<TOutput>(Uri uri, CancellationToken cancellationT
6366
{
6467
await _socket.ConnectAsync(uri, cancellationToken);
6568
_receiveTask = ReceiveMessagesAsync<TOutput>(cancellationToken);
66-
State = DashScopeWebSocketState.Connected;
69+
State = DashScopeWebSocketState.Ready;
6770
}
6871

6972
/// <summary>
7073
/// Reset binary output.
7174
/// </summary>
7275
public void ResetOutput()
7376
{
77+
BinaryOutput.Writer.TryComplete();
7478
BinaryOutput = Channel.CreateUnbounded<byte>(UnboundedChannelOptions);
7579
_taskStartedSignal = new TaskCompletionSource<bool>();
7680
}
@@ -84,17 +88,22 @@ public void ResetOutput()
8488
/// <typeparam name="TInput">Type of the input.</typeparam>
8589
/// <typeparam name="TParameter">Type of the parameter.</typeparam>
8690
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> is requested.</exception>
87-
/// <exception cref="InvalidOperationException">Websocket is not connected.</exception>
91+
/// <exception cref="InvalidOperationException">Websocket is not connected or already closed.</exception>
8892
/// <exception cref="ObjectDisposedException">The underlying websocket has already been closed.</exception>
8993
public Task SendMessageAsync<TInput, TParameter>(
9094
DashScopeWebSocketRequest<TInput, TParameter> request,
9195
CancellationToken cancellationToken = default)
9296
where TInput : class
9397
where TParameter : class
9498
{
99+
if (State == DashScopeWebSocketState.Closed)
100+
{
101+
throw new InvalidOperationException("Socket is already closed.");
102+
}
103+
104+
var json = JsonSerializer.Serialize(request, DashScopeDefaults.SerializationOptions);
95105
return _socket.SendAsync(
96-
new ArraySegment<byte>(
97-
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request, DashScopeDefaults.SerializationOptions))),
106+
new ArraySegment<byte>(Encoding.UTF8.GetBytes(json)),
98107
WebSocketMessageType.Text,
99108
true,
100109
cancellationToken);
@@ -110,7 +119,6 @@ public Task SendMessageAsync<TInput, TParameter>(
110119
try
111120
{
112121
var result = await _socket.ReceiveAsync(segment, cancellationToken);
113-
114122
if (result.MessageType == WebSocketMessageType.Close)
115123
{
116124
await CloseAsync(cancellationToken);
@@ -168,7 +176,7 @@ public async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationTo
168176
_taskStartedSignal.TrySetResult(true);
169177
break;
170178
case "task-finished":
171-
State = DashScopeWebSocketState.Connected;
179+
State = DashScopeWebSocketState.Ready;
172180
BinaryOutput.Writer.Complete();
173181
break;
174182
case "task-failed":
@@ -199,7 +207,7 @@ public async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationTo
199207
public async Task CloseAsync(CancellationToken cancellationToken = default)
200208
{
201209
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken);
202-
Dispose();
210+
State = DashScopeWebSocketState.Closed;
203211
}
204212

205213
private void Dispose(bool disposing)
@@ -208,7 +216,6 @@ private void Dispose(bool disposing)
208216
{
209217
// Dispose managed resources.
210218
_socket.Dispose();
211-
State = DashScopeWebSocketState.Closed;
212219
BinaryOutput.Writer.TryComplete();
213220
}
214221
}

src/Cnblogs.DashScope.Core/DashScopeClientWebSocketPool.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@ public sealed class DashScopeClientWebSocketPool(DashScopeOptions options) : IDi
1313

1414
internal void ReturnSocketAsync(DashScopeClientWebSocket socket)
1515
{
16-
if (socket.State != DashScopeWebSocketState.Connected)
16+
if (socket.State != DashScopeWebSocketState.Ready)
1717
{
1818
// not returnable, disposing.
1919
socket.Dispose();
20+
return;
2021
}
2122

2223
_available.Add(socket);
@@ -39,10 +40,11 @@ public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync<TOutput>(
3940
if (_available.IsEmpty == false)
4041
{
4142
found = _available.TryTake(out socket);
42-
if (socket?.State != DashScopeWebSocketState.Connected)
43+
if (socket?.State != DashScopeWebSocketState.Ready)
4344
{
4445
// expired
4546
found = false;
47+
socket?.Dispose();
4648
}
4749
}
4850
else
@@ -52,12 +54,11 @@ public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync<TOutput>(
5254
}
5355
}
5456

55-
return ActiveSocket(socket!);
57+
return ActivateSocket(socket!);
5658
}
5759

58-
private DashScopeClientWebSocketWrapper ActiveSocket(DashScopeClientWebSocket socket)
60+
private DashScopeClientWebSocketWrapper ActivateSocket(DashScopeClientWebSocket socket)
5961
{
60-
socket.ResetOutput();
6162
_active.Add(socket);
6263
return new DashScopeClientWebSocketWrapper(socket, this);
6364
}

src/Cnblogs.DashScope.Core/DashScopeClientWebSocketWrapper.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ public sealed record DashScopeClientWebSocketWrapper(DashScopeClientWebSocket So
1818
/// </summary>
1919
public Task TaskStarted => Socket.TaskStarted;
2020

21+
/// <summary>
22+
/// Reset task signal and output cannel.
23+
/// </summary>
24+
public void ResetTask() => Socket.ResetOutput();
25+
2126
/// <summary>
2227
/// Send message to server.
2328
/// </summary>
@@ -40,5 +45,11 @@ public Task SendMessageAsync<TInput, TParameter>(
4045
public void Dispose()
4146
{
4247
Pool.ReturnSocketAsync(Socket);
48+
GC.SuppressFinalize(this);
49+
}
50+
51+
~DashScopeClientWebSocketWrapper()
52+
{
53+
Dispose();
4354
}
4455
}

src/Cnblogs.DashScope.Core/DashScopeOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,5 @@ public class DashScopeOptions
3030
/// <summary>
3131
/// Default socket pool size.
3232
/// </summary>
33-
public int SocketPoolSize { get; set; } = 10;
33+
public int SocketPoolSize { get; set; } = 32;
3434
}

src/Cnblogs.DashScope.Core/DashScopeWebSocketState.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public enum DashScopeWebSocketState
1313
/// <summary>
1414
/// The socket has been connected and ready.
1515
/// </summary>
16-
Connected,
16+
Ready,
1717

1818
/// <summary>
1919
/// The socket has a running task waiting to be finished.

src/Cnblogs.DashScope.Core/SpeechSynthesizerSocketSession.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
/// </summary>
66
/// <param name="socket">Underlying websocket.</param>
77
/// <param name="modelId">Model name to use.</param>
8-
public class SpeechSynthesizerSocketSession(DashScopeClientWebSocketWrapper socket, string modelId)
8+
public sealed class SpeechSynthesizerSocketSession(DashScopeClientWebSocketWrapper socket, string modelId)
9+
: IDisposable
910
{
1011
/// <summary>
1112
/// Send a run-task command, use random GUID as taskId.
@@ -52,6 +53,7 @@ public async Task<string> RunTaskAsync(
5253
Parameters = parameters
5354
}
5455
};
56+
socket.ResetTask();
5557
await socket.SendMessageAsync(command, cancellationToken);
5658
await socket.TaskStarted;
5759
return taskId;
@@ -105,4 +107,18 @@ public IAsyncEnumerable<byte> GetAudioAsync()
105107
{
106108
return socket.BinaryOutput;
107109
}
110+
111+
private void Dispose(bool disposing)
112+
{
113+
if (disposing)
114+
{
115+
socket.Dispose();
116+
}
117+
}
118+
119+
/// <inheritdoc />
120+
public void Dispose()
121+
{
122+
Dispose(true);
123+
}
108124
}

0 commit comments

Comments
 (0)
0