探索Rabbitmq的Java客户端

探索Rabbitmq的Java客户端

AMQPConnection

实例初始化

创建Connection时会通过FrameHandlerFacotry创建一个SocketFrameHandler,SocketFrameHandler对Socket进行了封装。

public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
    {
        checkPreconditions();
        this.username = params.getUsername();
        this.password = params.getPassword();
        this._frameHandler = frameHandler;
        this._virtualHost = params.getVirtualHost();
        this._exceptionHandler = params.getExceptionHandler();

        this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
        this.requestedFrameMax = params.getRequestedFrameMax();
        this.requestedChannelMax = params.getRequestedChannelMax();
        this.requestedHeartbeat = params.getRequestedHeartbeat();
        this.shutdownTimeout = params.getShutdownTimeout();
        this.saslConfig = params.getSaslConfig();
        this.executor = params.getExecutor();
        this.threadFactory = params.getThreadFactory();

        this._channelManager = null;

        this._brokerInitiatedShutdown = false;

        this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
    }

启动连接

初始化WorkService和HeartBeatSender。

创建一个channel0的AMQChannel,这个channel不会被ChannelManager管理

首先channel0会将一个BlockingRpcContinuation作为当前未完成的Rpc请求,用于接收handshake的响应。

然后channel0会向socket中写入一条只有header的消息作为handshake,header中包含了客户端的版本号。

紧接着会启动主循环线程,主循环会通过SocketFrameHandler从socket接收字节流。此时接收到的第一条数据是服务端响应handshake返回的Connection.Start信息(包含服务端版本、机制、基础信息)。

主循环线程启动后,主线程会阻塞地等待服务端的handshake响应。拿到响应之后会对服务器回传的信息进行比对,然后发送Connection.StartOK的信息去服务端(这个请求也还是阻塞式的),等待服务端回传Connection.Tune(包含最大channel数、最大frame长度和heartbeat间隔)。将这些信息与实例初始化是的设置信息进行对比,初始化ChannelManager

紧接着发送Connection.TuneOk和Connection.Open消息去服务端,完成connection的建立。

Connection > MainLoop > readFrame

消息体

Frame是对AMQP消息的封装:包含frame的type、channel号、消息内容

type|channelNumber|payloadSize|payload|frameEndMarker

Payload包含了消息类型、消息头和消息主题

method|header|body

消息发送和接收

消息的发送和接收都要channel来完成。

创建Channel

通过Connection的ChannelManager来创建Channel,通过指定的ChannelNumber或者由分配器分配。创建好的Channel实例会放入ChannelManager的Map中,key为ChannelNumber。由此可见Channel是Connection唯一的。

public ChannelN createChannel(AMQConnection connection);
public ChannelN createChannel(AMQConnection connection, int channelNumber);
private ChannelN addNewChannel(AMQConnection connection, int channelNumber);
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService);

Channel实例化之后会调用Channel.open方法,发送Channel.Open去服务端(阻塞式),等待服务端响应Channel.OpenOk。

消息发送

Channel.transmit 发送消息,调用AMQCommand.transmit完成发送。

AMQCommand.transmit将消息封装成Frame,通过connection的SocketFrameHandler写入OutpuStream。

消息接收

主循环线程在链接创建完成后会监听socket,从InputStream中读取二进制流封装成Frame。通过Frame中的ChannelNumber从ChannelManager中获取对应的Channel实例处理Frame。

while (_running) {
    Frame frame = _frameHandler.readFrame();
    if (frame != null) {
        _missedHeartbeats = 0;
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
            // Ignore it: we've already just reset the heartbeat counter.
        } else {
            if (frame.channel == 0) { // the special channel
                _channel0.handleFrame(frame);
            } else {
                if (isOpen()) {
                    // If we're still _running, but not isOpen(), then we
                    // must be quiescing, which means any inbound frames
                    // for non-zero channels (and any inbound commands on
                    // channel zero that aren't Connection.CloseOk) must
                    // be discarded.
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        cm.getChannel(frame.channel).handleFrame(frame);
                    }
                }
            }
        }
    } else {
        // Socket timeout waiting for a frame.
        // Maybe missed heartbeat.
        handleSocketTimeout();
    }
}

Channel会使用已经准备好的AMQCommand处理Frame,并未下一个Frame准备新的AMQCommand。

public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

AMQCommad会使用CommandAssembler依次从Frame的payload中检出对应的Method、Header和Body。如果检出了Body,整个Frame会被检出完成。如过未完成,会进入主循环再次处理直至完成。

public synchronized boolean handleFrame(Frame f) throws IOException
{
    switch (this.state) {
      case EXPECTING_METHOD:          consumeMethodFrame(f); break;
      case EXPECTING_CONTENT_HEADER:  consumeHeaderFrame(f); break;
      case EXPECTING_CONTENT_BODY:    consumeBodyFrame(f);   break;
      default:
          throw new AssertionError("Bad Command State " + this.state);
    }
    return isComplete();
}

Frame被检出完后,会根据Method的类型进入不同的异步处理流程。

Method在channel打开和关闭的情况下会以下的可能:

Channel打开:Basic.Deliver, Basic.Return, Basic.Flow, Basic.Ack, Basic.Nack, Basic.RecoveryOk, Basic.Cancel

Channel关闭:Channel.CloseOk

生产和消费

生产

调用Channel.basicPublish()方法,指定exchange、routingKey等信息,消息属性、消息体。封装成Baisc.Publish,放入AMQCommand,最后调用transmit方法完成发送。参考消息发送

public void basicPublish(String exchange, String routingKey,
                         boolean mandatory, boolean immediate,
                         BasicProperties props, byte[] body)
    throws IOException
{
    if (nextPublishSeqNo > 0) {
        unconfirmedSet.add(getNextPublishSeqNo());
        nextPublishSeqNo++;
    }
    BasicProperties useProps = props;
    if (props == null) {
        useProps = MessageProperties.MINIMAL_BASIC;
    }
    transmit(new AMQCommand(new Basic.Publish.Builder()
                                .exchange(exchange)
                                .routingKey(routingKey)
                                .mandatory(mandatory)
                                .immediate(immediate)
                            .build(),
                            useProps, body));
}

消费

创建QueueingConsumer实例,然后调用Channel.basicConsume方法。

queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume("queue_name", queueingConsumer);
new Thread() {
    @Override
    public void run() {
        while (true) {
            try {
                final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                new Thread() {
                    @Override
                    public void run() {
                      	try{
                            delivery.getEnvelope();//消息头
                            delivery.getProperties();//消息属性
                            delivery.getBody()//消息体
                      	}finally{
                          //channel.basicAck();
                          //channel.basicNack()
                      	}
                    }
                }.start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}.start();

QueueingConsumer实现了Consumer接口。

Channel.basicConsume方法会封装Channel.Consume消息发送到服务端(阻塞式),等待服务端的Channel.ConsumeOk响应(包含了服务端为Consumer分配的ConsumerTag)。然后将QueueingConsumer放入Map中,key为ConsumerTag。consumer是Channel唯一。

当客户端接收到消息,参考消息接收。Basic.Deliver类型的消息(consumerTag、deliveryTag、redelivered、exchange、routingKey)会进入消费处理流程。Channel根据ConsumerTag从Map中获取对应的QueueConsumer实例,由Channel的ConsumerDispatcher通过Connection初始化的WorkService创建新的处理线程,调用QueueConsumer实例的handleDelivery方法。QueueConsumer将消息封装成Delivery对象,放入BlockingQueue中。

消费线程等待新的Delivery(阻塞式),之后创建新的线程完成消息的处理。

(转载本站文章请注明作者和出处乱世浮生,请勿用于任何商业用途)

comments powered by Disqus