public class ConsumerProxy {
/**
* 消费者端的动态代理
* @param interfaceClass 代理的接口类
* @param host 远程主机IP
* @param port 远程主机端口
* @param
* @return
*/
@SuppressWarnings("unchecked")
public static T consume(final Class interfaceClass,final String host,final int port) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class[]{interfaceClass}, (proxy,method,args) -> {
//创建一个客户端套接字
Socket socket = new Socket(host, port);
try {
//创建一个对外传输的对象流,绑定套接字
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
//将动态代理的方法名写入对外传输的对象流中
output.writeUTF(method.getName());
//将动态代理的方法的参数写入对外传输的对象流中
output.writeObject(args);
//创建一个对内传输的对象流,绑定套接字
//这里是为了获取提供者端传回的结果
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
//从对内传输的对象流中获取结果
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
);
}
}
有关JDK动态代理的内容可以参考AOP原理与自实现 ,BIO的部分可以参考传统IO与NIO比较
提供者端的网络传输和远程方式调用服务
public class ProviderReflect {
private static final ExecutorService executorService = Executors.newCachedThreadPool();
/**
* RPC监听和远程方法调用
* @param service RPC远程方法调用的接口实例
* @param port 监听的端口
* @throws Exception
*/
public static void provider(final Object service,int port) throws Exception {
//创建服务端的套接字,绑定端口port
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
//开始接收客户端的消息,并以此创建套接字
final Socket socket = serverSocket.accept();
//多线程执行,这里的问题是连接数过大,线程池的线程数会耗尽
executorService.execute(() -> {
try {
//创建呢一个对内传输的对象流,并绑定套接字
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
try {
//从对象流中读取接口方法的方法名
String methodName = input.readUTF();
//从对象流中读取接口方法的所有参数
Object[] args = (Object[]) input.readObject();
Class[] argsTypes = new Class[args.length];
for (int i = 0;i < args.length;i++) {
argsTypes[i] = args[i].getClass();
}
//创建一个对外传输的对象流,并绑定套接字
//这里是为了将反射执行结果传递回消费者端
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Class>[] interfaces = service.getClass().getInterfaces();
Method method = null;
for (int i = 0;i < interfaces.length;i++) {
method = interfaces[i].getDeclaredMethod(methodName,argsTypes);
if (method != null) {
break;
}
}
Object result = method.invoke(service, args);
//将反射执行结果写入对外传输的对象流中
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
启动提供者端的网络侦听和远程调用
public class RPCProviderMain {
public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
ProviderReflect.provider(service,8083);
}
}
启动消费者的动态代理调用
public class RPCConsumerMain {
public static void main(String[] args) throws InterruptedException {
HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083);
for (int i = 0;i < 1000;i++) {
String hello = service.sayHello("你好_" + i);
System.out.println(hello);
Thread.sleep(1000);
}
}
}