跳到主要内容

三十一、RocketMQ源码分析消息轨迹


本文沿着《RocketMQ消息轨迹-设计篇》的思路,从如下3个方面对其源码进行解读:

1、 发送消息轨迹;
2、 消息轨迹格式;
3、 存储消息轨迹数据;

本节目录

  • 1、发送消息轨迹流程

  • 1.1 DefaultMQProducer构造函数

  • 1.2 SendMessageTraceHookImpl钩子函数

  • 1.2.1 SendMessageTraceHookImpl类图

    1.2.2 源码分析SendMessageTraceHookImpl

    • 1.2.2.1 sendMessageBefore
    • 1.2.2.2 sendMessageAfter
  • 1.3 TraceDispatcher实现原理

  • 1.3.1 TraceDispatcher构造函数

    1.3.2 getAndCreateTraceProducer详解

    1.3.3 start

    1.3.4 AsyncRunnable

    1.3.5 AsyncAppenderRequest\#sendTraceData

    1.3.6 TraceDataEncoder\#encoderFromContextBean
    • 1.3.6.1 PUB(消息发送)
    • 1.3.6.2 SubBefore(消息消费之前)
    • 1.3.2.3 SubAfter(消息消费后)

2、 消息轨迹数据如何存储;

  • 2.1 使用系统默认的主题名称
  • 2.2 用户自定义消息轨迹主题

1、发送消息轨迹流程

首先我们来看一下在消息发送端如何启用消息轨迹,示例代码如下:

public class TraceProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); // @1
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}