I've created GRPC service host under .NET core 3.1 (using Grpc.AspNetCore v2.30 from https://github.com/grpc/grpc-dotnet) and GRPC client under .NET framework 4.6.2 (using Grpc v2.30 from https://github.com/grpc/grpc ). These frameworks for the hosts are a constraint. I'm running many calls to stress test the service from one client - one time calling Update the other calling UpdateStream. For both I'm facing a strange problem in the client side. It sometimes generates an error - can happen immediately when I start the execution or in the middle of it, and it never recovers - I must stop the client host and restart it again to make it work. it only happens when using different machines - on local host calls there are no issues.
对这个问题有什么想法/想法吗?
这是我为Update / UpdateStream客户端调用而获得的异常:
Status(StatusCode =“ Unknown”,Detail =“处理程序引发了异常。”,DebugException =“ Grpc.Core.Internal.CoreErrorDetailException:{” created“:” @ 1595 930477.263000000“,”描述“:”收到来自对等ipv4的错误:[ip]:23456“,”文件“:” T:\ src \ github \ grpc \ workspace_csharp_ext_windows_x86 \ src \ core \ lib \ surface \ call.cc“,” file_line“:1055,” grpc_message“:”处理程序引发了异常。“,” grpc_status“:2}”)
这是客户端/服务器代码:
服务器:
class Program
{
const int _port = 23456;
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.WriteLine("started - press any key to quit...");
Console.ReadKey();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.ConfigureKestrel(options =>
{
options.ConfigureEndpointDefaults(o =>
{
o.Protocols = HttpProtocols.Http2;
});
options.ListenAnyIP(_port);
});
webBuilder.UseStartup<Startup>();
});
}
public class ProxyService : StreamingApi.Protos.StreamingApi.StreamingApiBase
{
private long _handledRequests = 0;
private long _timeDiff = 0;
public override Task<UpdateResponse> Update(UpdateRequest request, ServerCallContext context)
{
Interlocked.Add(ref _timeDiff, (DateTime.Now - TimeSpan.FromTicks(Convert.ToInt64(request.Items["time"]))).Millisecond);
Interlocked.Increment(ref _handledRequests);
return Task.FromResult(new UpdateResponse());
}
public override async Task<UpdateResponse> UpdateStream(IAsyncStreamReader<UpdateRequest> requestStream, ServerCallContext serverCallContext)
{
try
{
while (await requestStream.MoveNext(serverCallContext.CancellationToken))
{
var updateReq = requestStream.Current;
Interlocked.Add(ref _timeDiff, (DateTime.Now - TimeSpan.FromTicks(Convert.ToInt64(updateReq.Items["time"]))).Millisecond);
Interlocked.Increment(ref _handledRequests);
}
}
catch(OperationCanceledException ex)
{
// log
}
return new UpdateResponse();
}
}
class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddGrpc();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<ProxyService>();
});
}
}
客户:
partial class Program
{
static void Main(string[] args)
{
Console.WriteLine();
Arguments arguments = new Arguments(args);
if (arguments.Initialized)
{
IProxyClient grpcClient = GetGrpcClient(arguments.Host, arguments.Port, ChannelCredentials.Insecure);
string limitationMsg = arguments.UseLimiter ?
$"limitation of max {arguments.MaxRequestsPerTimeUnit} requests per {arguments.TimeUnitSecs} seconds":
"no limitation";
Console.WriteLine($"\nExecuting {arguments.TotalRequests} requests with StreamMode={arguments.IsStreamMode} using {arguments.Threads} threads with {limitationMsg} ...");
var result = Run(grpcClient, arguments.Threads, arguments.TotalRequests, arguments.MaxRequestsPerTimeUnit, TimeSpan.FromSeconds(arguments.TimeUnitSecs), arguments.UseLimiter, arguments.IsStreamMode).Result;
Console.WriteLine($"Time Taken = {result.Item1}, Total Request Calls = {result.Item2}, Total Errors: {result.Item3}\n");
grpcClient.Disconnect().Wait();
Thread.Sleep(1000);
}
}
private static IProxyClient GetGrpcClient(string host, int port, ChannelCredentials channelCredentials)
{
var channel = new Channel(host, port, channelCredentials);
StreamingApi.Protos.StreamingApi.StreamingApiClient channelClient = new StreamingApi.Protos.StreamingApi.StreamingApiClient(channel);
return new ProxyClient(channel, channelClient);
}
private static async Task<(TimeSpan, int, int)> Run(IProxyClient grpcClient,
int threads,
int requests,
int maxRequestsPerTimeUnit,
TimeSpan timeUnit,
bool useLimiter,
bool isStreamMode)
{
int totalRequestCalls = 0;
int totalErrors = 0;
List<Task> tasks = new List<Task>();
int requestsPerThread = requests / threads;
TimeLimiter timeLimiter = useLimiter ? TimeLimiter.GetFromMaxCountByInterval(maxRequestsPerTimeUnit, timeUnit) : null;
UpdateRequest request = GetMeasuredRequest();
Stopwatch sw = new Stopwatch();
sw.Start();
for (int i = 0; i < threads; i++)
{
tasks.Add(Task.Run(async () =>
{
for (int requestIndex = 0; requestIndex < requestsPerThread; requestIndex++)
{
request.Items["time"] = DateTime.Now.Ticks.ToString();
if (useLimiter)
{
await timeLimiter;
}
try
{
if (isStreamMode)
{
await grpcClient.SendUpdateStream(request);
}
else
{
_ = await grpcClient.SendUpdate(request);
}
Interlocked.Increment(ref totalRequestCalls);
}
catch (Exception ex)
{
Interlocked.Increment(ref totalErrors);
Console.WriteLine(ex.Message);
Thread.Sleep(500);
}
}
}));
}
await Task.WhenAll(tasks);
sw.Stop();
return (sw.Elapsed, totalRequestCalls, totalErrors);
}
private static UpdateRequest GetMeasuredRequest()
{
UpdateRequest request = new UpdateRequest { ItemName = "pattern", SubcriptionId = "subscriptionId", IsSnapshot = false};
request.Items["key1"] = "value1";
request.Items["key2"] = "value2";
request.Items["key3"] = "value3";
request.Items["key4"] = "value4";
request.Items["key5"] = "value5";
return request;
}
}
public class ProxyClient : IProxyClient
{
private Channel _channel;
private StreamingApi.Protos.StreamingApi.StreamingApiClient _client;
private AsyncClientStreamingCall<UpdateRequest, UpdateResponse> _updateRequestStreamWriter;
public ProxyClient(Channel channel, StreamingApi.Protos.StreamingApi.StreamingApiClient client)
{
_client = client;
_updateRequestStreamWriter = client.UpdateStream();
}
public async Task Disconnect()
{
await _channel.ShutdownAsync();
}
public async Task<UpdateResponse> SendUpdate(UpdateRequest request)
{
return await _client.UpdateAsync(request);
}
public async Task SendUpdateStream(UpdateRequest request)
{
await _updateRequestStreamWriter.RequestStream.WriteAsync(request);
}
}