.NETFramework 4.6.2客户端到.NET Core 3.1服务器GRPC客户端调用失败

I've created GRPC service host under .NET core 3.1 (using Grpc.AspNetCore v2.30 from https://github.com/grpc/grpc-dotnet) and GRPC client under .NET framework 4.6.2 (using Grpc v2.30 from https://github.com/grpc/grpc ). These frameworks for the hosts are a constraint. I'm running many calls to stress test the service from one client - one time calling Update the other calling UpdateStream. For both I'm facing a strange problem in the client side. It sometimes generates an error - can happen immediately when I start the execution or in the middle of it, and it never recovers - I must stop the client host and restart it again to make it work. it only happens when using different machines - on local host calls there are no issues.

对这个问题有什么想法/想法吗?

这是我为Update / UpdateStream客户端调用而获得的异常:

Status(StatusCode =“ Unknown”,Detail =“处理程序引发了异常。”,DebugException =“ Grpc.Core.Internal.CoreErrorDetailException:{” created“:” @ 1595 930477.263000000“,”描述“:”收到来自对等ipv4的错误:[ip]:23456“,”文件“:” T:\ src \ github \ grpc \ workspace_csharp_ext_windows_x86 \ src \ core \ lib \ surface \ call.cc“,” file_line“:1055,” grpc_message“:”处理程序引发了异常。“,” grpc_status“:2}”)

这是客户端/服务器代码:

服务器:

    class Program
    {
        const int _port = 23456;

        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.WriteLine("started - press any key to quit...");
            Console.ReadKey();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.ConfigureKestrel(options =>
                    {
                        options.ConfigureEndpointDefaults(o =>
                        {
                            o.Protocols = HttpProtocols.Http2;
                            
                        });
                        options.ListenAnyIP(_port);
                    });
                    webBuilder.UseStartup<Startup>();
                });
    }

    public class ProxyService : StreamingApi.Protos.StreamingApi.StreamingApiBase
    {
        private long _handledRequests = 0;
        private long _timeDiff = 0;

        public override Task<UpdateResponse> Update(UpdateRequest request, ServerCallContext context)
        {
            Interlocked.Add(ref _timeDiff, (DateTime.Now - TimeSpan.FromTicks(Convert.ToInt64(request.Items["time"]))).Millisecond);
            Interlocked.Increment(ref _handledRequests);
            return Task.FromResult(new UpdateResponse());
        }
        public override async Task<UpdateResponse> UpdateStream(IAsyncStreamReader<UpdateRequest> requestStream, ServerCallContext serverCallContext)
        {
            try
            {
                while (await requestStream.MoveNext(serverCallContext.CancellationToken))
                {
                    var updateReq = requestStream.Current;
                    Interlocked.Add(ref _timeDiff, (DateTime.Now - TimeSpan.FromTicks(Convert.ToInt64(updateReq.Items["time"]))).Millisecond);
                    Interlocked.Increment(ref _handledRequests);
                }
            }
            catch(OperationCanceledException ex)
            {
                // log
            }
            return new UpdateResponse();
        }
    }

   class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddGrpc();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapGrpcService<ProxyService>();
            });
        }
    }

客户:

    partial class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine();
            Arguments arguments = new Arguments(args);

            if (arguments.Initialized)
            {
                IProxyClient grpcClient = GetGrpcClient(arguments.Host, arguments.Port, ChannelCredentials.Insecure);

                string limitationMsg = arguments.UseLimiter ?
                    $"limitation of max {arguments.MaxRequestsPerTimeUnit} requests per {arguments.TimeUnitSecs} seconds":
                    "no limitation";
                Console.WriteLine($"\nExecuting {arguments.TotalRequests} requests with StreamMode={arguments.IsStreamMode} using {arguments.Threads} threads with {limitationMsg} ...");

                var result = Run(grpcClient, arguments.Threads, arguments.TotalRequests, arguments.MaxRequestsPerTimeUnit, TimeSpan.FromSeconds(arguments.TimeUnitSecs), arguments.UseLimiter, arguments.IsStreamMode).Result;
                
                Console.WriteLine($"Time Taken = {result.Item1}, Total Request Calls = {result.Item2}, Total Errors: {result.Item3}\n");
                grpcClient.Disconnect().Wait();
                Thread.Sleep(1000);
            }
        }

        private static IProxyClient GetGrpcClient(string host, int port, ChannelCredentials channelCredentials)
        {
            var channel = new Channel(host, port, channelCredentials);
            StreamingApi.Protos.StreamingApi.StreamingApiClient channelClient = new StreamingApi.Protos.StreamingApi.StreamingApiClient(channel);
            return new ProxyClient(channel, channelClient);
        }

        private static async Task<(TimeSpan, int, int)> Run(IProxyClient grpcClient,
            int threads,
            int requests,
            int maxRequestsPerTimeUnit,
            TimeSpan timeUnit,
            bool useLimiter,
            bool isStreamMode)
        {
            int totalRequestCalls = 0;
            int totalErrors = 0;


            List<Task> tasks = new List<Task>();
            int requestsPerThread = requests / threads;
            TimeLimiter timeLimiter = useLimiter ? TimeLimiter.GetFromMaxCountByInterval(maxRequestsPerTimeUnit, timeUnit) : null;

            UpdateRequest request = GetMeasuredRequest();

            Stopwatch sw = new Stopwatch();
            sw.Start();

            for (int i = 0; i < threads; i++)
            {
                tasks.Add(Task.Run(async () =>
                {
                    for (int requestIndex = 0; requestIndex < requestsPerThread; requestIndex++)
                    {
                        request.Items["time"] = DateTime.Now.Ticks.ToString();
                        if (useLimiter)
                        {
                            await timeLimiter;
                        }
                        try
                        {
                            if (isStreamMode)
                            {
                               await grpcClient.SendUpdateStream(request);
                            }
                            else
                            {
                                _ = await grpcClient.SendUpdate(request);
                            }
                            Interlocked.Increment(ref totalRequestCalls);
                        }
                        catch (Exception ex)
                        {
                            Interlocked.Increment(ref totalErrors);
                            Console.WriteLine(ex.Message);
                            Thread.Sleep(500);
                        }
                    }
                }));
            }
            await Task.WhenAll(tasks);

            sw.Stop();
            return (sw.Elapsed, totalRequestCalls, totalErrors);
        }

        private static UpdateRequest GetMeasuredRequest()
        {
            UpdateRequest request = new UpdateRequest { ItemName = "pattern", SubcriptionId = "subscriptionId", IsSnapshot = false};
            request.Items["key1"] = "value1";
            request.Items["key2"] = "value2";
            request.Items["key3"] = "value3";
            request.Items["key4"] = "value4";
            request.Items["key5"] = "value5";

            return request;
        }
    }

    public class ProxyClient : IProxyClient
    {
        private Channel _channel;
        private StreamingApi.Protos.StreamingApi.StreamingApiClient _client;
        private AsyncClientStreamingCall<UpdateRequest, UpdateResponse> _updateRequestStreamWriter;

        public ProxyClient(Channel channel, StreamingApi.Protos.StreamingApi.StreamingApiClient client)
        {
            _client = client;
            _updateRequestStreamWriter = client.UpdateStream();

        }

        public async Task Disconnect()
        {
            await _channel.ShutdownAsync();
        }

        public async Task<UpdateResponse> SendUpdate(UpdateRequest request)
        {
            return await _client.UpdateAsync(request);
        }

        public async Task SendUpdateStream(UpdateRequest request)
        {
            await _updateRequestStreamWriter.RequestStream.WriteAsync(request);
        }
    }