博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
GRPC单向/双向流
阅读量:5320 次
发布时间:2019-06-14

本文共 8689 字,大约阅读时间需要 28 分钟。

开始食用grpc(之二)

https://www.cnblogs.com/funnyzpc/p/9570992.html

开始食用grpc(之一)

https://www.cnblogs.com/funnyzpc/p/9501353.html

https://grpc.io

https://github.com/grpc/grpc-java/tree/master/examples/example-tls
https://github.com/grpc/grpc-java
https://www.myssl.cn/tools/merge-pem-cert.html

 

双向流式调用方法及注意事项:

  由于双向流的使用方式不用于上期所讲的,这里我从编写一步步讲。

  先在preview-grpc-lib工程先的proto文件夹下编写一个包含双向流的是proto文件以生成客户端和服务器相关代码(记得把生成的代码放入工程内)。

  (MultiStream.proto)

1 syntax = "proto3"; 2  3 option java_multiple_files = true; 4 option java_package = "com.funnyzpc.XXX.grpc.lib.multiStream"; 5 //定义一个服务 6 service MultiStreamService{ 7     rpc queryStream (stream MultiStreamReq) returns (stream MultiStreamResp) { 8  9     }10 11 }12 //定义一个请求体(用于传参)13 message MultiStreamReq{14     int32 page_no=1;15     int32 page_size=2;16     MultiStreamDataReq data=3;17 }18 19 message MultiStreamDataReq{20     string name=1;21     bool type=2;22 }23 //定义一个响应体(用于回参)24 message MultiStreamResp{25     string req_str=1;26     MultiStreamFirstResp first=2;27 }28 message MultiStreamFirstResp{29     string f_content=1;30     int64 idx=2;31 32 }

这里可能需要对比着上一节所讲的复杂proto文件编写的内容,可以看到:请求体和响应体的定义大致都是一样的,只是在服务定义的时候会有一些些差别>请求体和响应体的前面多了一个关键字"stream” ,就是(请求或响应)只要一方是以流的方式发送就需要声明为 “stream" 。

  编写个客户端服务代码:

1 @Service 2 public class GrpcMultiStreamClientService { 3     private static final Logger LOG=LoggerFactory.getLogger(GrpcMultiStreamClientService.class); 4  5     @GrpcClient("preview-grpc-server") 6     private Channel rpcChannel; 7  8     /** 9      * grpc>双向流方式10      * @return11      */12     public Object queryByStream()throws Exception{13         Map
resp=new HashMap<>();14 15 StreamObserver
req= new StreamObserver
() {16 @Override17 public void onNext(MultiStreamResp multiStreamResp) {18 resp.put("req_str",multiStreamResp.getReqStr());19 resp.put("f_content",multiStreamResp.getFirst().getFContent());20 resp.put("idx",multiStreamResp.getFirst().getIdx());21 LOG.info("onNext()");22 //return resp;23 }24 25 @Override26 public void onError(Throwable throwable) {27 LOG.info("onError()");28 }29 30 @Override31 public void onCompleted() {32 LOG.info("onCompleted()");33 }34 };35 36 MultiStreamServiceGrpc.MultiStreamServiceStub stud=MultiStreamServiceGrpc.newStub(rpcChannel);37 StreamObserver
reqStream=stud.queryStream(req);38 39 MultiStreamDataReq streamDataReq=MultiStreamDataReq.newBuilder()40 .setName("req>name field")41 .setType(false)42 .build();43 MultiStreamReq streamReq= MultiStreamReq.newBuilder()44 .setPageNo(1)45 .setPageSize(20)46 .setData(streamDataReq).build();47 48 reqStream.onNext(streamReq);49 reqStream.onCompleted();50 return resp;51 }52 }

可以在上图看到,请求方法内首先是要放入一个构造的内部请求方法,请求体也需要放入到StreamObserver这个对象中,这是与之前编写的grpc客户端(阻塞)所不一样的地方,同时构造stub的时候是newStub而不是newBlockingStub ,当然这两者是有区别的,前者仅适用于http2二进制流的方式 并且是一个异步的(这是重点),而后者是仅适用于http1.1的字符明文方式 并且是阻塞方式(这也是重点),后面我会说说这两者的具体使用区别。

 

 

接下来写一个grpc服务端服务类,这是代码:

1 @GrpcService(value= MultiStreamServiceGrpc.class) 2 public class GrpcMultiStreamService extends MultiStreamServiceGrpc.MultiStreamServiceImplBase{ 3     private static final Logger LOG= LoggerFactory.getLogger(GrpcMultiStreamService.class); 4  5     @Override 6     public StreamObserver
queryStream(StreamObserver
resp) { 7 return new StreamObserver
() { 8 @Override 9 public void onNext(MultiStreamReq multiStreamReq) {10 MultiStreamFirstResp firstResp=MultiStreamFirstResp.newBuilder()11 .setFContent("f_content")12 .setIdx(99).build();13 MultiStreamResp req=MultiStreamResp.newBuilder()14 .setReqStr("req_str")15 .setFirst(firstResp).build();16 resp.onNext(req);17 resp.onCompleted();18 }19 20 @Override21 public void onError(Throwable throwable) {22 LOG.info("onError()");23 }24 25 @Override26 public void onCompleted() {27 LOG.info("onCompleted()");28 }29 };30 31 32 }

整体的构造方法和逻辑代码和客户端代码相似,同时服务端的逻辑代码基本上全在StreamObserver这个异步对象中处理,同时这个构造方法也提供了错误和完成所对的重载方法,要进行业务处理也必须在重载的onNext方法中编写。

   主题的服务已经编写完成,现在添加一个控制器来看看这个服务有没问题。

1     @Autowired2     private GrpcMultiStreamClientService multiStreamClientService;3 4     @RequestMapping("/grpc4")5     public Object grpc4()throws Exception{6         return multiStreamClientService.queryByStream();7     }

 

 

可能你会咦的一声说:请求是成功的,但为什么取到的服务端的参数是空呢?

其实这个很好理解,因为客户端的请求服务方式是流,此种方式下响应当然是异步的,这里方便调试,需要添加线程阻塞,才可能获取到服务端的响应参数(下图中红色部分)>

1 @Service 2 public class GrpcMultiStreamClientService { 3     private static final Logger LOG=LoggerFactory.getLogger(GrpcMultiStreamClientService.class); 4  5     @GrpcClient("preview-grpc-server") 6     private Channel rpcChannel; 7  8     /** 9      * grpc>双向流方式10      * @return11      */12     public Object queryByStream()throws Exception{13         Map
resp=new HashMap<>();14 15 StreamObserver
req= new StreamObserver
() {16 @Override17 public void onNext(MultiStreamResp multiStreamResp) {18 resp.put("req_str",multiStreamResp.getReqStr());19 resp.put("f_content",multiStreamResp.getFirst().getFContent());20 resp.put("idx",multiStreamResp.getFirst().getIdx());21 LOG.info("onNext()");22 //return resp;23 }24 25 @Override26 public void onError(Throwable throwable) {27 LOG.info("onError()");28 }29 30 @Override31 public void onCompleted() {32 LOG.info("onCompleted()");33 }34 };35 36 MultiStreamServiceGrpc.MultiStreamServiceStub stud=MultiStreamServiceGrpc.newStub(rpcChannel);37 StreamObserver
reqStream=stud.queryStream(req);38 39 MultiStreamDataReq streamDataReq=MultiStreamDataReq.newBuilder()40 .setName("req>name field")41 .setType(false)42 .build();43 MultiStreamReq streamReq= MultiStreamReq.newBuilder()44 .setPageNo(1)45 .setPageSize(20)46 .setData(streamDataReq).build();47 48 reqStream.onNext(streamReq);49 reqStream.onCompleted();50 Thread.sleep(10000);51 return resp;52 }53 }

可以看到线程睡眠了10秒,如果打断点可以看到 睡眠的过程中会响应客户端中的onNext方法,再就是把参数放入到resp中,当然客户端服务为流的方式下一般不做线程睡眠的操作,因为服务器有可能超时,如果超时那可就麻烦了。所以说grpc异步是有极好的应用场景,比如业务费阻塞,日志处理等等,同时如果需要直接响应请使用阻塞的方式(上面已经说过了),好了,这个时候,我们看看结果>

ok,可以顺利的看到服务器的响应结果了。

 

grpc安全问题及拦截器:

  对于grpc安全问题,grpc只在服务端提供了 服务端证书验证 的方式,具体就是在在客户端请求的时候验证客户地址是否是有效而已,默认不使用的时候服务端证书的开关是关闭着的,这个验证其实也很简陋,具体的可以看看源码便知:

如若开发的系统要保证极高的安全度,建议使用这两类方式:

  A>将客户端应用和服务端应用放置在同一个内往下,服务端关闭外网直接访问

  B>可以在服务端添加拦截器,使用token的方式来验证客户端身份是否合法(这种方式可能需要客户端设置请求头)

  对于以上两种安全访问方式,也可以以混合的方式使用,对于以上后者,我简单的列举下如何使用拦截器,就一个简单的例子呵~

  首先填写一个服务端拦截器>

1 public class GrpcInterceptor implements ServerInterceptor { 2     private static final Logger LOG=LoggerFactory.getLogger(GrpcInterceptor.class); 3  4     @Override 5     public 
ServerCall.Listener
interceptCall(ServerCall
call, Metadata headers, ServerCallHandler
next) { 6 LOG.info(call.getAttributes().toString()); 7 String inetSocketString = call.getAttributes() 8 .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString(); 9 LOG.info(inetSocketString);10 return next.startCall(call,headers);11 }12 }

如上,拦截器实现于grpc 的 ServerInterceptor 来编写的,如果需要做拦截处理 可以直接在interceptCall方法中编写相应的逻辑。

  然后需要在服务端服务类的注解中声明所使用的拦截器>

1 @GrpcService(value= MultiStreamServiceGrpc.class,interceptors = GrpcInterceptor.class)2 public class GrpcMultiStreamService extends MultiStreamServiceGrpc.MultiStreamServiceImplBase{3 //此处略4 }

拦截器声明可以见以上代码红色部分,以上代码的具体逻辑部分与以上GrpcMultiStreamService内容相同,同时顺带说下上面注解中的value变量,这个变量只是声明当前服务端服务类所使用的grpc的服务类是什么,当然可以填写其他的grpc的服务类(一定是proto文件生成的类才可以),并且不能为空!,同时这里就不给测试结果囖,读者打个断点就知道了。

转载于:https://www.cnblogs.com/bluestorm/p/10552846.html

你可能感兴趣的文章
【luogu P2298 Mzc和男家丁的游戏】 题解
查看>>
CVE-2012-0158 分析
查看>>
Git提交代码冲突:commit your changes or stash them before you can merge.
查看>>
OpenCV —— 直方图与匹配
查看>>
简单查看tomcat中部署java服务的内存使用情况
查看>>
jeasyui 造成$.data(...) is undefined报错的原因及解决
查看>>
ACCP7.0-S2-复习自测-15测试分析
查看>>
洛谷P3295 [SCOI2016]萌萌哒(倍增+并查集)
查看>>
loj#6053. 简单的函数(Min_25筛)
查看>>
洛谷P5245 【模板】多项式快速幂
查看>>
js apply 和call的区别
查看>>
调查显示:近半数人“婚房”首付要啃老
查看>>
java网络请求工具类
查看>>
linux常用命令
查看>>
头文件导入方式
查看>>
v-model
查看>>
JAVA基本数据类型所占字节数是多少?(32位系统)
查看>>
[杂题]FZU2190 非提的救赎
查看>>
yum安装方式的php,切换NTS为ZTS版本
查看>>
[学习]对于学习的一点心得
查看>>