java序列化Protostuff和Serializable的区别

发布网友 发布时间:2022-04-22 06:39

我来回答

2个回答

懂视网 时间:2022-05-13 23:47

rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到protostuff,觉得找到了一直在找的这种序列化方式。

protostuff简介

protobuf的一个缺点是需要数据结构的预编译过程,首先要编写.proto格式的配置文件,再通过protobuf提供的工具生成各种语言响应的代码。由于java具有反射和动态代码生成的能力,这个预编译过程不是必须的,可以在代码执行时来实现。有 protostuff已经实现了这个功能。

protostuff效率

  • Ser Time+Deser Time (ns)

  • Size, Compressed size [light] in bytes

  • 使用

    pom依赖

       com.dyuproject.protostuff  protostuff-core  1.0.8    com.dyuproject.protostuff  protostuff-runtime  1.0.8 

    工具类

    public class SerializationUtil { private static Map, Schema> cachedSchema = new ConcurrentHashMap, Schema>(); private static Objenesis objenesis = new ObjenesisStd(true); private static  Schema getSchema(Class clazz) { @SuppressWarnings("unchecked") Schema schema = (Schema) cachedSchema.get(clazz); if (schema == null) {  schema = RuntimeSchema.getSchema(clazz);  if (schema != null) {  cachedSchema.put(clazz, schema);  } } return schema; } /** * 序列化 * * @param obj * @return */ public static  byte[] serializer(T obj) { @SuppressWarnings("unchecked") Class clazz = (Class) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try {  Schema schema = getSchema(clazz);  return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) {  throw new IllegalStateException(e.getMessage(), e); } finally {  buffer.clear(); } } /** * 反序列化 * * @param data * @param clazz * @return */ public static  T deserializer(byte[] data, Class clazz) { try {  T obj = objenesis.newInstance(clazz);  Schema schema = getSchema(clazz);  ProtostuffIOUtil.mergeFrom(data, obj, schema);  return obj; } catch (Exception e) {  throw new IllegalStateException(e.getMessage(), e); } }}

    基于netty的rpc

  • NettyServer

  • public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private int ioThreadNum; //内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值 private int backlog; private int port; private Channel channel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; public NettyServer(int ioThreadNum, int backlog, int port) { this.ioThreadNum = ioThreadNum; this.backlog = backlog; this.port = port; } public void start() throws InterruptedException { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(this.ioThreadNum); final Map demoService = new HashMap(); demoService.put("com.patterncat.service.HelloService", new HelloServiceImpl()); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup)  .channel(NioServerSocketChannel.class)  .option(ChannelOption.SO_BACKLOG, backlog)  //注意是childOption  .childOption(ChannelOption.SO_KEEPALIVE, true)  .childOption(ChannelOption.TCP_NODELAY, true)  .childHandler(new ChannelInitializer() {   @Override   protected void initChannel(SocketChannel socketChannel) throws Exception {   socketChannel.pipeline()    .addLast(new RpcDecoder(RpcRequest.class))    .addLast(new RpcEncoder(RpcResponse.class))    .addLast(new ServerRpcHandler(demoService));   }  }); channel = serverBootstrap.bind("127.0.0.1",port).sync().channel(); logger.info("NettyRPC server listening on port "+ port + " and ready for connections..."); Runtime.getRuntime().addShutdownHook(new Thread(){  @Override  public void run(){  //do shutdown staff  } }); } public void stop() { if (null == channel) {  throw new ServerStopException(); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); bossGroup = null; workerGroup = null; channel = null; }}
  • ServerRpcHandler

  • public class ServerRpcHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class); private final Map serviceMapping; public ServerRpcHandler(Map serviceMapping) { this.serviceMapping = serviceMapping; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception { RpcResponse response = new RpcResponse(); response.setTraceId(rpcRequest.getTraceId()); try {  logger.info("server handle request:{}",rpcRequest);  Object result = handle(rpcRequest);  response.setResult(result); } catch (Throwable t) {  response.setError(t); } channelHandlerContext.writeAndFlush(response); } private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = serviceMapping.get(className); Class serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error(cause.getMessage(), cause); RpcResponse response = new RpcResponse(); if(cause instanceof ServerException){  response.setTraceId(((ServerException) cause).getTraceId()); } response.setError(cause); ctx.writeAndFlush(response); }}
  • NettyClient

  • public class NettyClient implements IClient { private EventLoopGroup workerGroup; private Channel channel; private int workerGroupThreads; private ClientRpcHandler clientRpcHandler; private final Optional> NO_TIMEOUT = Optional.>absent(); public NettyClient(int workerGroupThreads) { this.workerGroupThreads = workerGroupThreads; } public void connect(InetSocketAddress socketAddress) { workerGroup = new NioEventLoopGroup(workerGroupThreads); clientRpcHandler = new ClientRpcHandler(); Bootstrap bootstrap = new Bootstrap(); bootstrap  .group(workerGroup)  .channel(NioSocketChannel.class)  .option(ChannelOption.SO_KEEPALIVE, true)  .option(ChannelOption.TCP_NODELAY, true)  .handler(new ChannelInitializer() {   @Override   protected void initChannel(SocketChannel ch) throws Exception {   ch.pipeline()    .addLast(new RpcDecoder(RpcResponse.class))    .addLast(new RpcEncoder(RpcRequest.class))    .addLast(clientRpcHandler);   }  }); channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())  .syncUninterruptibly()  .channel(); } public RpcResponse syncSend(RpcRequest request) throws InterruptedException { System.out.println("send request:"+request); channel.writeAndFlush(request).sync(); return clientRpcHandler.send(request,NO_TIMEOUT); } public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException { channel.writeAndFlush(request); return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit))); } public InetSocketAddress getRemoteAddress() { SocketAddress remoteAddress = channel.remoteAddress(); if (!(remoteAddress instanceof InetSocketAddress)) {  throw new RuntimeException("Get remote address error, should be InetSocketAddress"); } return (InetSocketAddress) remoteAddress; } public void close() { if (null == channel) {  throw new ClientCloseException(); } workerGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); workerGroup = null; channel = null; }}
  • ClientRpcHandler

  • @ChannelHandler.Sharablepublic class ClientRpcHandler extends SimpleChannelInboundHandler { //用blocking queue主要是用阻塞的功能,省的自己加锁 private final ConcurrentHashMap> responseMap = new ConcurrentHashMap>(); //messageReceived @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception { System.out.println("receive response:"+rpcResponse); BlockingQueue queue = responseMap.get(rpcResponse.getTraceId()); queue.add(rpcResponse); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); } public RpcResponse send(RpcRequest request,Optional> timeout) throws InterruptedException { responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue(1)); RpcResponse response = null; try {  BlockingQueue queue = responseMap.get(request.getTraceId());  if(timeout == null || !timeout.isPresent()){  response = queue.take();  }else{  response = queue.poll(timeout.get().getKey(),timeout.get().getValue());  } } finally {  responseMap.remove(request.getTraceId()); } return response; }}
  • decoder

  • public class RpcDecoder extends ByteToMessageDecoder { private Class genericClass; public RpcDecoder(Class genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { if (byteBuf.readableBytes() < 4) {  return; } byteBuf.markReaderIndex(); int dataLength = byteBuf.readInt(); if (dataLength < 0) {  channelHandlerContext.close(); } if (byteBuf.readableBytes() < dataLength) {  byteBuf.resetReaderIndex(); } byte[] data = new byte[dataLength]; byteBuf.readBytes(data); Object obj = SerializationUtil.deserializer(data, genericClass); list.add(obj); }}   
  • encoder

  • public class RpcEncoder extends MessageToByteEncoder { private Class genericClass; public RpcEncoder(Class genericClass) { this.genericClass = genericClass; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception { if (genericClass.isInstance(obj)) {  byte[] data = SerializationUtil.serializer(obj);  byteBuf.writeInt(data.length);  byteBuf.writeBytes(data); } }}

    工程源码 protocol-demo

    参考

  • jvm-serializers

  • protostuff

  • java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子

  • Protostuff序列化

  • protostuff介绍

  • Protostuff详解

  • 序列化框架 kryo VS hessian VS Protostuff VS java

  • Protostuff序列化和反序列化

  • eishay/jvm-serializers

  • Protostuff 序列化

  • 使用Netty实现多路复用的client

  • 热心网友 时间:2022-05-13 20:55

    序列化就是将Java Object转成byte[];反序列化就是将byte[]转成Java Object。
    Java自带序列化机制java.io.Serializable
    标识一个对象需要系列化,该对象类型需要实现 Serializable 接口。
    1,序列化的类型和反序列化的类型的序列化ID必须一致(远程信息交换时)。
    2,静态数据不会被序列化,Transient关键字修饰的字段不会被序列化。
    3,对象序列化存储时,两次存储相同值对象会有优化(第二次对象写入会只存储引用)。
    Protostuff是一个序列化库,支持一下序列化格式:
    protobuf
    protostuff (native)
    graph (protostuff with support for cyclic references. See Serializing Object Graphs)
    json
    smile (binary json useable from the protostuff-json mole)
    xml
    yaml (serialization only)
    kvp (binary uwsgi header)
    序列化
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
    Class<T> cls = (Class<T>) obj.getClass();//获得对象的类;
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);//使用LinkedBuffer分配一块默认大小的buffer空间;
    try {
    Schema<T> schema = getSchema(cls);//通过对象的类构建对应的schema;
    return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//使用给定的schema将对象序列化为一个byte数组,并返回。
    } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);
    } finally {
    buffer.clear();
    }
    }
    反序列化
    public static <T> T deserialize(byte[] data, Class<T> cls) {
    try {
    T message = objenesis.newInstance(cls);//使用objenesis实例化一个类的对象;
    Schema<T> schema = getSchema(cls);//通过对象的类构建对应的schema;
    ProtostuffIOUtil.mergeFrom(data, message, schema);//使用给定的schema将byte数组和对象合并,并返回。
    return message;
    } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);
    }
    }
    优缺点比较:
    优点 缺点
    Serializable 使用方便,可序列化所有类 速度慢,占空间
    Protostuff 速度快,基于protobuf 需静态编译

    声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
    Copyright © 2019- 版权所有