emmmm不想一点一点介绍这个代码了,注释写的挺清楚的,主要思路就是Consumer通过反射获取对应的服务接口实例时,使用的代理模式。调用代理对象时的方法时,实际上是通过Netty的channel将对应的方法调用信息传给Provider直接wait()阻塞,然后通过synchronized+wait()+notify()实现线程通信,当Provider接收到客户端得请求时,根据请求数据去服务容器中寻找对应的服务进行调用处理(这里简单得通过一个Map模拟了服务容器,然后通过反射调用Method得invoke方法达到动态调用服务得办法),然后Provider调用成功后,将数据返回给consumer,consumer读取到处理结果后调用notify()方法,唤醒用户调用的代理方法,将处理结果返回。
代码中需要注意的点:
- 使用线程池获取线程通过channel发送数据给服务端,然后阻塞,通过简单的线程通信实现得到服务器响应后自动执行返回。
- 用户真正调用的是动态代理创建的代理对象,代理对象内部实现的才是上一步的步骤。
- 使用了Protobuf数据传输格式,需要编写*.proto文件,然后编译成Java源文件,最后记得加入protobuf的编解码器,
- 服务器通过一个Map模拟了服务调用功能的容器,然后通过反射的方式动态调用对用的方法
- 因为客户端没有加关闭同步的代码,不要加上关闭EventLoopGroup的代码。(别问我为啥,我因为这卡了一个多小时,各种找bug后面发现启动后自己关了)
差不多就这些内容,需要的同学可以分析一下对应的源码,客户端的handler代码实现相对比较巧妙,最后贴上代码。
package client;
import service.RpcHelloService;
/**
* 用于启动RPC的消费者服务
*/
public class ConsumerBootStrap {
public static void main(String[] args) {
ConsumerClient client = new ConsumerClient("127.0.0.1", 9898);
RpcHelloService bean = (RpcHelloService) client.getBean(RpcHelloService.class);
String msg = "你好呀,服务器。";
String res = bean.hello(msg);
System.out.println(res);
}
}
package client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import protocol.Data;
import java.util.concurrent.Callable;
public class ConsumerChannelHandler
extends SimpleChannelInboundHandler<Data.Rpcmessage>
implements Callable{
private Data.Rpcmessage msg;//需要发送给服务器的消息
private ChannelHandlerContext context;//Netty的上下文对象,帮助我们发送调用请求
private String result;//rpc的返回结果
public void setMsg(Data.Rpcmessage msg) {
System.out.println("setMsg");
this.msg = msg;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道连接成功...");
context = ctx;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, Data.Rpcmessage msg) throws Exception {
if (msg.getMsgType().equals(Data.Rpcmessage.MessageType.Response) ){
if ("200".equals(msg.getCode())) {
result = msg.getResult();//如果响应成功了,那么将结果赋给result
}
notify();//唤醒调用线程
}
}
@Override
public synchronized Object call() throws Exception {
System.out.println("call...");
//调用call方法将消息发送给服务提供者
context.writeAndFlush(msg);
wait();//等待服务提供者响应结果然后继续执行
return result;
}
}
package client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.socketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.string.StringEncoder;
import protocol.Data;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 消费者底层Netty网络通信客户端
*/
public class ConsumerClient {
//用于发送服务调用的线程池
private ExecutorService pool = Executors.newFixedThreadPool(5);
private ConsumerChannelHandler handler;
private final String HOST;
private final int PORT;
public ConsumerClient(String host, int port) {
this.HOST = host;
this.PORT = port;
}
public Object getBean(Class clazz){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{clazz},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//实际的业务方法需要通过线程池进行远程调用
//RpcHelloService hello
if (handler == null){
startClient();
}
Data.Rpcmessage message = Data.Rpcmessage.newBuilder().
setArgs((String) args[0])//设置方法执行的参数
.setClassName(clazz.getSimpleName())//设置远程调用的类名
.setMethod(method.getName())//设置调用的方法名
.setParamClazz(args[0].getClass().getName())
.setMsgType(Data.Rpcmessage.MessageType.Rquest)//设置当前是一个请求调用
.build();
handler.setMsg(message);
return pool.submit(handler).get();//真正调用的方法是远程的方法。
}
}
);
}
//并不是在创建ConsumerClient时候就启动客户端,只有真正准备调用服务接口之后才会准备启动
private void startClient(){
//客户端启动程序
handler = new ConsumerChannelHandler();
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入protobuf的编解码器
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(Data.Rpcmessage.getDefaultInstance()));
//加入自定义业务处理器
pipeline.addLast(handler);
}
});
bootstrap.connect(HOST, PORT).sync();
System.out.println("客户端启动成功...");
} catch (InterruptedException e) {
e.printstacktrace();
}
}
}
Syntax="proto3";//指定protobuf的版本
option java_outer_classname="Data";
message Rpcmessage{
string className = 1;//调用的类名
string method = 2;//调用的方法名
string paramClazz = 3;//参数的全类名
string args = 4;//方法调用所需要的参数
string code = 5;//响应状态码
string result = 6;//响应结果
MessageType msgType = 7;//当前消息是请求还是响应
enum MessageType{
Rquest = 0;
Response = 1;
}
}
package server;
public class ProviderBootStrap {
public static void main(String[] args) {
ProviderServer server = new ProviderServer(9898);
server.start();
}
}
package server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import protocol.Data;
import service.RpcHelloService;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
public class ProviderChannelHandler extends SimpleChannelInboundHandler<Data.Rpcmessage> {
//使用一个hashMap模拟服务调用容器
private static final HashMap<String,Object> MAP = new HashMap<>();
static {
RpcHelloService rpcHelloService = new RpcHelloService() {
@Override
public String hello(String msg) {
return "你好呀,客户端,我已经收到了你的请求:["+msg+"]";
}
};
MAP.put("RpcHelloService.hello",rpcHelloService);
System.out.println("初始化容器");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Data.Rpcmessage msg) throws Exception {
//收到了远程调用请求
System.out.println("服务器读取数据");
if (Data.Rpcmessage.MessageType.Rquest == msg.getMsgType()){
String className = msg.getClassName();
String method = msg.getmethod();
String args = msg.getArgs();
String paramClazz = msg.getParamClazz();
String result = invokeRpc(className,method,args,paramClazz);
Data.Rpcmessage resMsg = Data.Rpcmessage.newBuilder()
.setMsgType(Data.Rpcmessage.MessageType.Response)
.setResult(result)
.setCode("200")
.build();
ctx.writeAndFlush(resMsg);
}
}
private String invokeRpc(String className, String method, String args,String paramClazz) {
try {
String key = className+"."+method;
Object o = MAP.get(key);
Method tarMethod = o.getClass().getDeclaredMethod(method, Class.forName(paramClazz));
Object result = tarMethod.invoke(o, args);
return (String) result;
} catch (Exception e) {
e.printstacktrace();
}
return null;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printstacktrace();
ctx.close();
}
}
package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.socketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.string.StringDecoder;
import protocol.Data;
public class ProviderServer {
private final int port;
public ProviderServer(int port) {
this.port = port;
}
public void start(){
this.startServer0();
}
private void startServer0(){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(Data.Rpcmessage.getDefaultInstance()));
pipeline.addLast(new ProviderChannelHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port).sync();
System.out.println("服务器启动成功.....");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printstacktrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
package service;
public interface RpcHelloService {
String hello(String msg);
}
protobuf编译后的源代码因为太长了,就不贴了,通过protoc.exe指令直接编译即可。