使用 Apache mina 建立长连接

由于业务需要,有一些实时性要求高的接口从无状态的 HTTP 连接切换到了长连接, 所以最近基于 Apache mina 来实现了长连接.主要说明一下 Android 客户端的实现.

依赖

对于 Android 客户端来说, 不需要引入太多的包.

首先下载 mina , 目前我用的版本是 2.0.13, 解压后引入 mina-core-2.0.13.jar 包. 然后下载 slf4j-android 包,引入 slf4j-android-1.6.1-RC1.jar包.只需两个 jar 包即可.

连接

主要代码如下(注意一定不能在主线程中连接):

SocketConnector mSocketConnector = new NioSocketConnector();
//设置协议封装解析处理
mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
//设置心跳包
KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartbeatMessageFactory());
heartFilter.setRequestInterval(5 * 60);
heartFilter.setRequestTimeout(10);
mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter);
//设置 handler 处理业务逻辑
mSocketConnector.setHandler(new SocketClientHandler());
   //配置服务器地址
InetSocketAddress mSocketAddress = new InetSocketAddress(IP, PORT);
ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
mFuture.awaitUninterruptibly();
IoSession mSession = mFuture.getSession();

TCP 的连接重点在于协议的处理和心跳.由于三次握手和粘包等问题已经在框架中处理好了,所以只需要配置协议的封装处理, 心跳包的设计业务逻辑的处理皆可.

前两者可以使用 mina 框架的 filterchain 来处理, 通过设置 Filter 使数据的发送返回结果更加符合自己的需求. addLast 等方法只是通过 key, value 的形式添加 Filter, 前面的 key 可以随意. Filter 的用法可以参考 mina官网.

协议的封装处理.

在项目中,我把 FrameCodecFactory 用作第一个 Filter, 对接收到的数据做第一步处理, 解析为我想要的格式, 同时把发出去的数据封装为服务器接受的格式.

这里项目中所使用的是 Frame 协议, 前四位表示数据的长度,后面接收该长度的数据,解析处理.

public class FrameCodecFactory implements ProtocolCodecFactory {

    @Override
    public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
        return new FrameEncoder();
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
        return new FrameDecoder();
    }
}

主要内容在于 Encoder 和 Decoder.

Encoder 用于封装你所发出的数据, 封装为服务器接受的格式. 这里是将 String 转为 Frame 协议.

public class FrameEncoder implements ProtocolEncoder {

    @Override
    public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
        if (o instanceof String) {
            String messageString = (String) o;
            //封装为 Frame 协议
            byte[] messageBytes = messageString.getBytes(Charset.forName("UTF-8"));
            int totalSize = messageBytes.length + 4;
            IoBuffer buffer = IoBuffer.allocate(totalSize);
            buffer.putInt(totalSize);
            buffer.put(messageBytes);
            buffer.flip();
            protocolEncoderOutput.write(buffer);
        }
    }

    @Override
    public void dispose(IoSession ioSession) throws Exception {

    }
}

Decoder 用于解析接收到的数据, 将 Frame 格式的数据流解析为 String.

public class FrameDecoder extends CumulativeProtocolDecoder {

    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        int totalLength = ioBuffer.getInt();
        int messageLength = totalLength - 4;
        if (ioBuffer.remaining() >= messageLength) {
            String messageString = ioBuffer.getString(messageLength, Charset.forName("UTF-8").newDecoder());
            protocolDecoderOutput.write(messageString);
            return true;
        }
        return false;
    }
}

在 FrameCodecFactory 这个 Filter 处理完后, 对于客户端的处理便可以使用 String 来传递解析数据了.

心跳包的处理

心跳包是判断 TCP 连接是否在线的关键, 在这里直接继承 KeepAliveMessageFactory ,实现对应的方法即可.

public class HeartbeatMessageFactory implements KeepAliveMessageFactory {

    @Override
    public boolean isRequest(IoSession ioSession, Object o) {
           //如果是客户端主动向服务器发起的心跳包, return true, 该框架会发送 getRequest() 方法返回的心跳包内容. 
        return false;
    }

    @Override
    public boolean isResponse(IoSession ioSession, Object o) {
        //如果是服务器发送过来的心跳包, return true后会在 getResponse() 方法中处理心跳包.
        return false;
    }

    @Override
    public Object getRequest(IoSession ioSession) {
        //自定义向服务器发送的心跳包内容.
        return null;
    }

    @Override
    public Object getResponse(IoSession ioSession, Object o) {
        //自定义解析服务器发送过来的心跳包. 
       return null;
    }
}

主要的心跳包内容及格式都可以在这里处理, 在实例化这个对象时可以设置间隔及超时时间.

KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartbeatMessageFactory());
//每 5 分钟发送一个心跳包
heartFilter.setRequestInterval(5 * 60);
//心跳包超时时间 10s
heartFilter.setRequestTimeout(10);

业务逻辑处理

数据通过 Filter 过滤后, 便会到设置的 Handler 的 messageReceived() 方法中回调. 我在这里处理对应的业务逻辑.

public class SocketClientHandler extends IoHandlerAdapter {

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        //处理业务,分发数据,可以利用广播等方式.
    }

}

断开链接

断开链接也不能在主线程中调用.

mFuture.cancel();
   mSession.closeNow();
   mSession.getCloseFuture().setClosed();
   mSession.getCloseFuture().awaitUninterruptibly();
//如果完全不连接了
mSocketConnector.dispose();

总结

之前没有实现过长连接, 一开始做的时候还以为要自己实现三次握手,拆包等问题,后面才发现这些东西都已经通过 mina 实现好了, 同时无论 netty 还是 mina, 通过 filterchain 这种方式对消息处理的解耦很方便, 每个步骤只需要实现它对应需要实现的方法即可.客户端只需要维护一个 client 对象, 通过广播等方式分发数据, 目前实现还没有遇到什么大坑~.