.net-grpc-android.md

Android 连接 .Net 的gRPC服务

.Net开启ssl gRPC

1
2
3
4
5
6
7
8
webBuilder.UseKestrel().ConfigureKestrel(e =>
{
    e.Listen(IPAddress.Any, 23333, listenConfigure =>
    {
        listenConfigure.Protocols = HttpProtocols.Http2;
        listenConfigure.UseHttps(new X509Certificate2("1.pfx","123456"));
    });
});

Android 使用ssl

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val caKeyStore: KeyStore = KeyStore.getInstance("PKCS12", "BC").apply {
            load(assets.open("1.pfx"), "123456".toCharArray())
            //setCertificateEntry("server", serverCertificate)
        }

        val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()).apply {
            init(caKeyStore)
        }

        val sslContext = SSLContext.getInstance("TLS").apply {
            init(null, trustManagerFactory.trustManagers, null)
        }
        
        managedChannel =
            OkHttpChannelBuilder.forAddress("192.168.3.233", 23333)
                    .hostnameVerifier { hostname, session ->
                        true
                    }
                    .sslSocketFactory(sslContext.socketFactory)
                    .keepAliveTime(10, TimeUnit.SECONDS)
                    .useTransportSecurity()
                    .keepAliveWithoutCalls(true)
                    .build()

安卓项目配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ext.kotlin_version = '1.5.30'
ext.grpc_version = '1.40.0'


maven { url 'https://maven.aliyun.com/repository/public/'}
maven { url "https://jitpack.io" }
maven { url 'http://maven.aliyun.com/nexus/content/repositories/releases/'}
google()
mavenCentral()

classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.17'
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
apply plugin: 'com.google.protobuf'

android {
    sourceSets {
        main {
            java {
                srcDir 'src/main/java'
            }
            proto  {
                srcDir 'src/main/java/com/example/caller/proto' //指定proto文件位置
                include '**/*.proto'
            }
        }
    }
    lintOptions {
        // Do not complain about outdated deps, so that this can javax.annotation-api can be same
        // as other projects in this repo. Your project is not required to do this, and can
        // upgrade the dep.
        disable 'GradleDependency'
        // The Android linter does not correctly detect resources used in Kotlin.
        // See:
        //   - https://youtrack.jetbrains.com/issue/KT-7729
        //   - https://youtrack.jetbrains.com/issue/KT-12499
        disable 'UnusedResources'
        textReport true
        textOutput "stdout"
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
protobuf {
    protoc { artifact = 'com.google.protobuf:protoc:3.17.3' }
    plugins {
        javalite { artifact = "com.google.protobuf:protoc-gen-javalite:3.0.0" }
        grpc { artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version" // CURRENT_GRPC_VERSION
        }
    }
    generateProtoTasks {
        all().each { task ->
            task.builtins {
                java { option 'lite' }
            }
            task.plugins {
                grpc { option 'lite' }
            }
        }
    }
}
1
2
3
4
5
6
dependencies {
    implementation "io.grpc:grpc-okhttp:$grpc_version" // CURRENT_GRPC_VERSION
    implementation "io.grpc:grpc-protobuf-lite:$grpc_version" // CURRENT_GRPC_VERSION
    implementation "io.grpc:grpc-stub:$grpc_version" // CURRENT_GRPC_VERSION
    implementation "javax.annotation:javax.annotation-api:1.3.2"
}

proto文件放置位置

srcDir所指定的位置即src/main/java/me/example/packagename/proto,我这里是放在当前包名下的proto目录下

proto生成的Stub文件的包名

在proto文件中增加一行option java_package="com.example.caller";即可。com.example.caller 为包名.

生成

点击Android Studio顶部的build即可生成,即使有报错(依赖或代码错误),stub文件也是可以生成的,之后可以到generated目录下查看文件

AndroidChannelBuilder

Android上建议使用这个来作为channel,可以检测到网络连接的变化,代码如下

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package com.example.caller.grpc;

import android.annotation.TargetApi;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.net.ConnectivityManager;
import android.net.Network;
import android.os.Build;
import android.util.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingChannelBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.internal.GrpcUtil;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
 * Builds a {@link ManagedChannel} that, when provided with a {@link Context}, will automatically
 * monitor the Android device's network state to smoothly handle intermittent network failures.
 *
 * <p>Currently only compatible with gRPC's OkHttp transport, which must be available at runtime.
 *
 * <p>Requires the Android ACCESS_NETWORK_STATE permission.
 *
 * @since 1.12.0
 */
public final class AndroidChannelBuilder extends ForwardingChannelBuilder<AndroidChannelBuilder> {

    private static final String LOG_TAG = "AndroidChannelBuilder";

    @Nullable
    private static final Class<?> OKHTTP_CHANNEL_BUILDER_CLASS = findOkHttp();

    private static Class<?> findOkHttp() {
        try {
            return Class.forName("io.grpc.okhttp.OkHttpChannelBuilder");
        } catch (ClassNotFoundException e) {
            return null;
        }
    }

    private final ManagedChannelBuilder<?> delegateBuilder;

    @Nullable private Context context;

    /**
     * Creates a new builder with the given target string that will be resolved by
     * {@link io.grpc.NameResolver}.
     */
    public static AndroidChannelBuilder forTarget(String target) {
        return new AndroidChannelBuilder(target);
    }

    /**
     * Creates a new builder with the given host and port.
     */
    public static AndroidChannelBuilder forAddress(String name, int port) {
        return forTarget(GrpcUtil.authorityFromHostAndPort(name, port));
    }

    /**
     * Creates a new builder, which delegates to the given ManagedChannelBuilder.
     *
     * @deprecated Use {@link #usingBuilder(ManagedChannelBuilder)} instead.
     */
    @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6043")
    @Deprecated
    public static AndroidChannelBuilder fromBuilder(ManagedChannelBuilder<?> builder) {
        return usingBuilder(builder);
    }

    /**
     * Creates a new builder, which delegates to the given ManagedChannelBuilder.
     *
     * <p>The provided {@code builder} becomes "owned" by AndroidChannelBuilder. The caller should
     * not modify the provided builder and AndroidChannelBuilder may modify it. That implies reusing
     * the provided builder to build another channel may result with unexpected configurations. That
     * usage should be discouraged.
     *
     * @since 1.24.0
     */
    public static AndroidChannelBuilder usingBuilder(ManagedChannelBuilder<?> builder) {
        return new AndroidChannelBuilder(builder);
    }

    private AndroidChannelBuilder(String target) {
        if (OKHTTP_CHANNEL_BUILDER_CLASS == null) {
            throw new UnsupportedOperationException("No ManagedChannelBuilder found on the classpath");
        }
        try {
            delegateBuilder =
                    (ManagedChannelBuilder)
                            OKHTTP_CHANNEL_BUILDER_CLASS
                                    .getMethod("forTarget", String.class)
                                    .invoke(null, target);
        } catch (Exception e) {
            throw new RuntimeException("Failed to create ManagedChannelBuilder", e);
        }
    }

    private AndroidChannelBuilder(ManagedChannelBuilder<?> delegateBuilder) {
        this.delegateBuilder = Preconditions.checkNotNull(delegateBuilder, "delegateBuilder");
    }

    /**
     * Enables automatic monitoring of the device's network state.
     */
    public AndroidChannelBuilder context(Context context) {
        this.context = context;
        return this;
    }

    @Override
    protected ManagedChannelBuilder<?> delegate() {
        return delegateBuilder;
    }

    /**
     * Builds a channel with current configurations.
     */
    @Override
    public ManagedChannel build() {
        return new AndroidChannel(delegateBuilder.build(), context);
    }

    /**
     * Wraps an OkHttp channel and handles invoking the appropriate methods (e.g., {@link
     * ManagedChannel#enterIdle) when the device network state changes.
     */
    @VisibleForTesting
    static final class AndroidChannel extends ManagedChannel {

        private final ManagedChannel delegate;

        @Nullable private final Context context;
        @Nullable private final ConnectivityManager connectivityManager;

        private final Object lock = new Object();

        @GuardedBy("lock")
        private Runnable unregisterRunnable;

        @VisibleForTesting
        AndroidChannel(final ManagedChannel delegate, @Nullable Context context) {
            this.delegate = delegate;
            this.context = context;

            if (context != null) {
                connectivityManager =
                        (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
                try {
                    configureNetworkMonitoring();
                } catch (SecurityException e) {
                    Log.w(
                            LOG_TAG,
                            "Failed to configure network monitoring. Does app have ACCESS_NETWORK_STATE"
                                    + " permission?",
                            e);
                }
            } else {
                connectivityManager = null;
            }
        }

        @GuardedBy("lock")
        private void configureNetworkMonitoring() {
            // Android N added the registerDefaultNetworkCallback API to listen to changes in the device's
            // default network. For earlier Android API levels, use the BroadcastReceiver API.
            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N && connectivityManager != null) {
                final DefaultNetworkCallback defaultNetworkCallback = new DefaultNetworkCallback();
                connectivityManager.registerDefaultNetworkCallback(defaultNetworkCallback);
                unregisterRunnable =
                        new Runnable() {
                            @TargetApi(Build.VERSION_CODES.LOLLIPOP)
                            @Override
                            public void run() {
                                connectivityManager.unregisterNetworkCallback(defaultNetworkCallback);
                            }
                        };
            } else {
                final NetworkReceiver networkReceiver = new NetworkReceiver();
                @SuppressWarnings("deprecation")
                IntentFilter networkIntentFilter =
                        new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
                context.registerReceiver(networkReceiver, networkIntentFilter);
                unregisterRunnable =
                        new Runnable() {
                            @TargetApi(Build.VERSION_CODES.LOLLIPOP)
                            @Override
                            public void run() {
                                context.unregisterReceiver(networkReceiver);
                            }
                        };
            }
        }

        private void unregisterNetworkListener() {
            synchronized (lock) {
                if (unregisterRunnable != null) {
                    unregisterRunnable.run();
                    unregisterRunnable = null;
                }
            }
        }

        @Override
        public ManagedChannel shutdown() {
            unregisterNetworkListener();
            return delegate.shutdown();
        }

        @Override
        public boolean isShutdown() {
            return delegate.isShutdown();
        }

        @Override
        public boolean isTerminated() {
            return delegate.isTerminated();
        }

        @Override
        public ManagedChannel shutdownNow() {
            unregisterNetworkListener();
            return delegate.shutdownNow();
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return delegate.awaitTermination(timeout, unit);
        }

        @Override
        public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
                MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
            return delegate.newCall(methodDescriptor, callOptions);
        }

        @Override
        public String authority() {
            return delegate.authority();
        }

        @Override
        public ConnectivityState getState(boolean requestConnection) {
            return delegate.getState(requestConnection);
        }

        @Override
        public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
            delegate.notifyWhenStateChanged(source, callback);
        }

        @Override
        public void resetConnectBackoff() {
            delegate.resetConnectBackoff();
        }

        @Override
        public void enterIdle() {
            delegate.enterIdle();
        }

        /** Respond to changes in the default network. Only used on API levels 24+. */
        @TargetApi(Build.VERSION_CODES.N)
        private class DefaultNetworkCallback extends ConnectivityManager.NetworkCallback {
            @Override
            public void onAvailable(Network network) {
                delegate.enterIdle();
            }
        }

        /** Respond to network changes. Only used on API levels < 24. */
        private class NetworkReceiver extends BroadcastReceiver {
            private boolean isConnected = false;

            @SuppressWarnings("deprecation")
            @Override
            public void onReceive(Context context, Intent intent) {
                ConnectivityManager conn =
                        (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
                android.net.NetworkInfo networkInfo = conn.getActiveNetworkInfo();
                boolean wasConnected = isConnected;
                isConnected = networkInfo != null && networkInfo.isConnected();
                if (isConnected && !wasConnected) {
                    delegate.enterIdle();
                }
            }
        }
    }
}

管理Channel

这里整了个单例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.qijin.evaluating.grpc.AndroidChannelBuilder
import io.grpc.ManagedChannel
import java.util.concurrent.TimeUnit

class GrpcService private constructor(){
    companion object {
        val instance: GrpcService by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { GrpcService() }
    }
    var channel: ManagedChannel?=null
    var _serverIP:String = ""

    fun makeChannel(serverIP:String) {
        _serverIP = serverIP
        if (channel==null) {
            createChannel()
        } else {
            channel = null
            createChannel()
        }
    }
    private fun createChannel() {
        var serverIP = ""
        var port = 23333
        if (_serverIP.contains(":")) {
            serverIP = _serverIP.split(":")[0]
            port = _serverIP.split(":")[1].toInt()
        } else {
            serverIP = _serverIP
        }
        channel = AndroidChannelBuilder.forAddress(serverIP,port)
            .usePlaintext()
            .context(App.instance)
            .keepAliveTimeout(5, TimeUnit.SECONDS)
            .keepAliveTime(1, TimeUnit.SECONDS)
            .keepAliveWithoutCalls(true)
            .build()
    }

    fun getMyChannel(forceRecreate:Boolean=false):ManagedChannel {
        if (forceRecreate) {
            channel=null
        }
        if (channel==null && _serverIP!="") {
            createChannel()
        }
        return channel!!
    }
}

只要在开始时makeChannel传入服务器ip和端口即可,只传ip则使用默认端口

getMyChannel 的 forceRecreate 参数用于在连接断开后的重新创建连接, 这样比较暴力,不过可以达到目的,关于此,详细看下面的内容

调用示例

异步调用

PS:

  1. StreamObserver的三个方法都是线程中的方法,因此在方法内执行ui相关的代码时需要 runOnUiThread
  2. 同时onNext中发生的异常会到onError中抛出,而正常情况下,onError中我们只希望捕获到接口返回的异常,因此onNext请妥善处理异常情况。否则onNext的异常会导致channel的连接诡异断开(在streams部分解释为什么)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
val client = EvaluateDeviceGrpc.newStub(GrpcService.instance.getMyChannel())
val req = EvaluateDeviceOuterClass.GetUserInfoRequest.newBuilder()
.setEveluteDeviceID(evaluateId)
.setUserNo(et_no.text.toString())
.build()

client.getUserInfo(req, object :StreamObserver<EvaluateDeviceOuterClass.UserResponse> {
    override fun onNext(value: EvaluateDeviceOuterClass.UserResponse?) {

    }
    override fun onError(t: Throwable?) {

    }
    override fun onCompleted() {

    }
}

streams订阅连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private fun startConfigSub(force: Boolean = false) {
    val c = DeviceInfoServiceGrpc.newStub(GrpcService.instance.getMyChannel(force))
            val req = DeviceInfo.DeviceConfigRequest.newBuilder().setDeviceType("P")
                .setMac(DeviceUtils.getMacAddress()).build()
    c.deviceConfig(req, object : StreamObserver<DeviceInfo.DeviceConfigResponse> {
                override fun onNext(value: DeviceInfo.DeviceConfigResponse?) {
                    
                }

                override fun onError(t: Throwable?) {
                    Log.d("WorkFragment", "报错重连")
                    if (t?.message?.contains("Keepalive") == true) {
                        startConfigSub(true)
                    } else {
                        startConfigSub()
                    }
                }

                override fun onCompleted() {
                    
                }

            })

}

streams接口在连接不正常时不会通过keepalive而重新进行调用, 因此需要在发生特定异常时,重新调用方法, 并且需要使channel重新创建,即force传true。 如果异常不是连接类异常(业务异常),则无需重新创建channel直接重新调用即可。

同步调用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun deviceReg(): Boolean {
        try {
            val c = DeviceInfoServiceGrpc.newBlockingStub(GrpcService.instance.getMyChannel())
            val req = DeviceInfo.DeviceInfoRegistRequest.newBuilder()
                .setDeviceType("P")
                .setMac(DeviceUtils.getMacAddress())
                .setDeviceIP(NetworkUtils.getIPAddress(true))
                .setHostName(DeviceUtils.getAndroidID())
                .setVersion(App.instance.getAppVersionName())
                .setDeviceID(DeviceUtils.getAndroidID())
                .setDesc("请修改此字段描述设备位置")
                .setDeviceName(DeviceUtils.getManufacturer() + "-" + DeviceUtils.getModel())
                .build()
            val resp = c.deviceRegist(req)
            Log.d("WorkFragment", resp.toString())
            return resp.code == "SUCCESS"
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
            return false
        }

    }

.Net 维护streams长连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 维护设备连接
private static readonly ConcurrentDictionary<DeviceConfigRequest, IServerStreamWriter<DeviceConfigResponse>> _subscriptions =
            new ConcurrentDictionary<DeviceConfigRequest, IServerStreamWriter<DeviceConfigResponse>>();

public override async Task DeviceConfig(DeviceConfigRequest request, IServerStreamWriter<DeviceConfigResponse> responseStream, ServerCallContext context)
        {
            _logger.LogWarning($"{context.Peer} DeviceConfig");
            if (!_subscriptions.TryAdd(request,responseStream))
            {
                _logger.LogWarning("加入连接管理失败, 连接已存在");
                //连接存在说明 此设备是第二次连接过来了, 此时可以清除掉旧的连接, 添加新的连接
                //管理的连接 需要永远以最新的为准
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 清理旧连接");
                _subscriptions.TryRemove(request, out IServerStreamWriter<DeviceConfigResponse> value1);
                _subscriptions.TryAdd(request, responseStream);
            } else
            {
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 已连接, 首次下发配置");
                var config = getDeviceConfig(request.DeviceType, request.Mac);
                await responseStream.WriteAsync(config);
            }
            
            
            var count = 0;
            // Keep the stream open so we can continue writing new Messages as they are pushed
            while (!context.CancellationToken.IsCancellationRequested)
            {
                
                count++;
                // 当连接没断开时, 每1秒发送当前秒级时间
                if (count%10==0)
                {
                    await responseStream.WriteAsync(new DeviceConfigResponse()
                    {
                        Time = DateTime.Now.ToUnix2(),
                        Config = "",
                    });
                }
                if (count%100==0)
                {
                    count = 0;
                    // 当连接没断开时, 每x (10) 秒更新心跳数据
                    UpdateHeartData(request.DeviceType, request.Mac);
                }
                
                // Avoid pegging CPU
                await Task.Delay(100); //这里短一点可以让连接更快断开
            }
            // Cancellation was requested, remove the stream from stream map
            _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 连接断开, 正在清理");
            _subscriptions.TryGetValue(request, out IServerStreamWriter<DeviceConfigResponse> value);
            if (value!=null && value.Equals(responseStream))
            {
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 原始连接, 正在清理");
                _subscriptions.TryRemove(request, out IServerStreamWriter<DeviceConfigResponse> _);
            } else
            {
                _logger.LogWarning($"{context.Peer} 设备[{request.DeviceType}]({request.Mac}) 已是新连接, 不清理 {_subscriptions.Count}");
            }
        }

这里实现了一个设备只有一个连接的逻辑,DeviceConfigRequest 作为_subscriptions的key,只有mac地址和设备类型,也就是一个mac地址只能有一个连接在维持

  • 当设备主动断开连接(正常退出等),则跳出while循环到原始连接, 正在清理部分
  • 当设备异常断开,诸如网络问题或强制关闭,则旧连接可能还没有被清理,新连接进来后将到 加入连接管理失败, 连接已存在,此时将_subscriptions里的responseStream更新一下,已是新连接, 不清理就是为了防止新连接已建立,但是旧的连接才刚退出循环的情况,此时不能将_subscriptions里唯一的连接给清理掉
  • 服务端重启,则只要保证客户端能重新连接即可

WinForm

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace PrintNumber
{
    public class GrpcService
    {
        private GrpcService() { }
        private static GrpcService _instance = null;
        public static GrpcService Instance
        {
            get {
                if (_instance == null)
                {
                    _instance = new GrpcService();
                }
                return _instance;
            }
        }


        private Channel channel;
        private string _serverIP = "";

        public void makeChannel(string serverIP)
        {
            _serverIP = serverIP;
            if (channel==null)
            {
                createChannel();
            }
        }

        private void createChannel()
        {
            var opts = new List<ChannelOption>();
            opts.Add(new ChannelOption("grpc.keepalive_time_ms", 1000));
            opts.Add(new ChannelOption("grpc.keepalive_timeout_ms", 5000));
            opts.Add(new ChannelOption("grpc.keepalive_permit_without_calls", 1));
            channel = new Channel($"{_serverIP}:23333", ChannelCredentials.Insecure,opts);
        }

        public Channel getMyChannel(bool forceRecreate = false)
        {
            if (forceRecreate)
            {
                channel = null;
            }
            if (channel==null && _serverIP!="")
            {
                createChannel();
            }
            return channel;

        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 用于程序退出后取消streams连接
static CancellationTokenSource cancelTokenSource = new CancellationTokenSource();

private async void startConfigSubAsync(CancellationTokenSource token, bool force= false)
        {
            if (token.IsCancellationRequested || !isMain)
            {
                return;
            }
            var aClient = new DeviceInfoServiceClient(GrpcService.Instance.getMyChannel(force));
            var req = new DeviceConfigRequest();
            req.DeviceType = "Q";
            req.Mac = NetHelper.GetMacByNetworkInterface();
            using (var client = aClient.DeviceConfig(req,null,null, cancelTokenSource.Token))
            {
                try
                {
                    while (!token.IsCancellationRequested && await client.ResponseStream.MoveNext())
                    {
                        var result = client.ResponseStream.Current;
                        ///。。。
                    }
                }
                catch (RpcException e)
                {
                    System.Diagnostics.Debug.WriteLine("异常:" + e.Message);
                    if (e.StatusCode==StatusCode.Cancelled)
                    {
                        // 触发取消时, 需要退出否则容易引发异常
                        return;
                    } else
                    {
                        // 其他异常则重连
                        System.Diagnostics.Debug.WriteLine("重连接:" + e.Message);
                        startConfigSubAsync(cancelTokenSource,true);
                    }
                    

                }
                catch  (Exception e)
                {
                    //非RPC异常 则重连接
                    System.Diagnostics.Debug.WriteLine("异常:" + e.Message);
                    startConfigSubAsync(cancelTokenSource, true);
                }
                
            }


            //startConfigSubAsync(cancelTokenSource);


        }

// 程序关闭时需要清理streams连接,否则会抛出异常
private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (isMain)
            {
                cancelTokenSource.Cancel();
                GrpcEnvironment.KillServersAsync();
            }
            

        }
Licensed under CC BY-NC-SA 4.0
记录平时瞎折腾遇到的各种问题, 方便查找
使用 Hugo 构建
主题 Stack 3.29.0Jimmy 设计