【Kafka】使用Wireshark抓包分析Kafka通信协议

Wireshark

什么是Wireshark?

Wireshark (前身 Ethereal)是一个网络封包分析软件。网络封包分析软件的功能是撷取网络封包,并尽可能显示出最为详细的网络封包资料。

是目前全球使用最广泛的开源抓包软件,其前身为Ethereal,是一个通用的网络数据嗅探器和协议分析器,由Gerald Combs编写并于1998年以GPL开源许可证发布。如果是网络工程师,可以通过Wireshark对网络进行 故障定位和排错; 如果安全工程师,可以通过Wireshark对网络 黑客渗透攻击进行快速定位并找出攻击源; 如果是测试或软件工程师,可以通过Wireshark 分析底层通讯机制等

Wireshark下载地址

界面介绍

打开WireShark,整个界面分为两部分——工具栏和窗格

image.png

最上面是工具栏,包含两部分

  • 主工具栏:提供从菜单快速访问常用项目的功能,该工具栏不能由用户自定义
  • “Filter” 工具栏:可以快速编辑和应用显示过滤器。

窗格从上到下总共有3块区域

Packet List窗格:显示当前捕获文件中的所有数据包

Packet Details窗格:数据包详细信息窗格以更详细的形式显示当前数据包

Packet Bytes窗格:数据包字节窗格以十六进制转储样式显示当前数据包的数据

使用显示过滤器

Wireshark 提供了一种显示过滤器语言,可以精确地控制显示哪些数据包。它们可用于检查协议或字段的存在,字段的值,甚至可以将两个字段相互比较。

显示过滤器字段

最简单的显示过滤器是显示单个协议的过滤器。要仅显示包含特定协议的数据包,请在 Wireshark 的显示过滤器工具栏中键入该协议。

例如,要仅显示 Kafka 数据包,请在 Wireshark 的显示过滤器工具栏中键入 kafka.

image.png

Wireshark内置支持的协议类型非常多,可以参考: https://www.wireshark.org/docs/dfref/

Wireshark支持的Kafka协议字段可参考此链接: https://www.wireshark.org/docs/dfref/k/kafka.html

比较值

可以使用多个不同的比较运算符来构建用于比较值的显示过滤器。

例如,要仅显示去往或来自 IP 地址 192.168.0.1 的数据包,请使用 ip.addr==192.168.0.1。

列出了可用的比较运算符的完整列表

image.png

Kafka通信协议

Kafka的Producer、Broker和Consumer之间采用的是一套自行设计的基于TCP层的协议。Kafka的这套协议完全是为了Kafka自身的业务需求而定制的,协议定义了所有 API 的请求及响应消息。

概述

Kafka 协议是相当简单的,只有六个核心客户端请求 API:

  1. 元数据(Metadata) – 描述当前可用的 brokers,brokers 的主机和端口信息,并提供了哪个 broker 托管了哪些分区的信息;
  2. 发送(Send) – 发送消息到 broker;
  3. 获取(Fetch) – 从 broker 上获取消息。主要分三类:一个用于获取数据,一个用于获取集群的元数据,还有一个用于获取 topic 的偏移量信息;
  4. 偏移量(Offsets) – 获取给定 topic 分区的可用偏移量信息;
  5. 提交偏移量(Offset Commit) – 提交消费者组(Consumer Group)的一组偏移量;
  6. 获取偏移量(Offset Fetch) – 为消费者组获取一组偏移量

此外,从 0.9 版本开始,Kafka 支持为消费者和 Kafka 连接进行分组管理。客户端 API 包括五个请求:

  1. 分组协调者(GroupCoordinator) – 用来定位一个分组的当前协调者。
  2. 加入分组(JoinGroup) – 成为某个分组的成员,当分组不存在(没有一个成员时)则创建分组。
  3. 同步分组(SyncGroup) – 同步分组中所有成员的状态(例如分发分区分配信息(Partition Assignments)到各个组员)。
  4. 心跳(Heartbeat) – 保持组内成员的活跃状态。
  5. 离开分组(LeaveGroup) – 直接离开一个组。

最后,有几个管理 API,可用于监控/管理 Kafka 集群:

  1. 描述消费者组(DescribeGroups) – 用于检查一组群体的当前状态(如:查看消费者分区分配)。
  2. 列出组(ListGroups) – 列出某个 broker 当前管理的所有组

这里不针对性细讲,完整的协议介绍可以参考:

版本和兼容性

协议的目的要达到在向后兼容的基础上渐进演化。版本是基于每个API基础之上,每个版本包括一个请求和响应对。每个请求包含API Key,里面包含了被调用的API标识,以及表示这些请求和响应格式的版本号。

0.9.0.1 Kafka集群支持如下ApiKey的请求

PRODUCE(0, "Produce"),

FETCH(1, "Fetch"),

LIST_OFFSETS(2, "Offsets"),

METADATA(3, "Metadata"),

LEADER_AND_ISR(4, "LeaderAndIsr"),

STOP_REPLICA(5, "StopReplica"),

UPDATE_METADATA_KEY(6, "UpdateMetadata"),

CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),

OFFSET_COMMIT(8, "OffsetCommit"),

OFFSET_FETCH(9, "OffsetFetch"),

GROUP_COORDINATOR(10, "GroupCoordinator"),

JOIN_GROUP(11, "JoinGroup"),

HEARTBEAT(12, "Heartbeat"),

LEAVE_GROUP(13, "LeaveGroup"),

SYNC_GROUP(14, "SyncGroup"),

DESCRIBE_GROUPS(15, "DescribeGroups"),

LIST_GROUPS(16, "ListGroups");

服务器会拒绝它不支持的版本的请求,并始终返回它期望收到的能够完成请求响应的版本的协议格式。

通用的请求和响应格式

所有请求和响应都从以下语法为基础

RequestOrResponse => Size (RequestMessage | ResponseMessage)  Size => int32

描述

MessageSize

MessageSize 域给出了后续请求或响应消息的字节(bytes)长度。客户端可以先读取4字节的长度N,然后读取并解析后续的N字节请求内容

请求(Requests)
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage

  ApiKey => int16

  ApiVersion => int16

  CorrelationId => int32

  ClientId => string

  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

描述

ApiKey

这是一个表示所调用的API的数字id(即它表示是一个元数据请求?生产请求?获取请求等)

ApiVersion

这是该API的一个数字版本号。我们为每个API定义一个版本号,该版本号允许服务器根据版本号正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式

CorrelationId

这是一个用户提供的整数。它将会被服务器原封不动地回传给客户端。用于匹配客户机和服务器之间的请求和响应

ClientId

这是为客户端应用程序的自定义的标识。用户可以使用他们喜欢的任何标识符,他们会被用在记录错误时,监测统计信息等场景。例如,你可能不仅想要监视每秒的总体请求,还要根据客户端应用程序进行监视,那它就可以被用上(其中每一个都将驻留在多个服务器上)。这个ID作为特定的客户端对所有的请求的逻辑分组

响应(Responses)
Response => CorrelationId ResponseMessage

CorrelationId => int32

ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse

描述

CorrelationId

服务器传回给客户端它所提供用作关联请求和响应消息的整数

所有响应都是与请求成对匹配(例如,发送回一个元数据请求,就会得到一个元数据响应)。

案例

kafka高版本Client连接0.9Serve

有高版本客户端连接0.9 Kafka集群时会出现生产和写入问题。

查看服务端日志如下:

image.png

看到java.lang.ArrayIndexOutOfBoundsException: 18这个关键字报错,可以明确有apikey=18的请求访问0.9集群,从前面可以知道0.9集群ApiKey最大支持到16,当前要找出是哪个任务用高版本客户端访问该0.9集群

使用tcpdump+Wireshark抓包分析

tcpdump抓包

在服务端,根据kafka所使用9092端口抓包

tcpdump -i any -nn -vv tcp port 9092 -s 0 -w kafka.cap

wireshark分析

wireshark可能未能自动识别出kafka协议。首先检查一下Wireshark是否支持kafka协议解析。

image.png

出现以上信息说明wireshark支持Kafka协议,如果没有的话,更新wireshark最新版即可。当前笔者使用的是Version 3.4.5

接下来点选中一条数据消息右键,点击“Decode As”,在弹出窗口的“当前”下拉列表中选择“kafka”,然后点击“OK”。

image.png

可以看到除了tcp控制报文外,其他报文都被解析成kafka协议(如解析不出来,可尝试退出wireshark重新打开)。

image.png

Decode As临时设置解码器,退出Wireshark以后,这些设置会丢失

在“Filter” 工具栏中输入kafka.api\\_key == 18 搜索apikey=18的请求来自哪个ip和端口

image.png

根据来源IP找到是实时计算集群,结合作业发布平台找出对应时间段可能的任务一一核实,找开发确认后将任务停掉恢复。

在案例中,之前处理方案是Kafka开启Trace日志重启,根据日志的最近的报错IP来猜测,具有一定的随机性,使用Wireshark工具分析可以又快又准的找出来。

查看Fetch请求或响应报文的详细字段

Kafka Fetch Request

可以看到向两个Partition 分区53和13请求消息,53分区请求offset是3605043491,Max Bytes是1MB。

image.png

Kafka Fetch Response

可以看到返回两个Partition 分区53和13的消息,53返回的是offset是3605043491,消息大小是2981B

image.png

总结

Wireshark在支持协议的数量方面是出类拔萃的,目前已提供了超过上千种协议的支持。这些协议包括从最基础的IP协议和DHCP协议到高级的专用协议比如Appletalk和Bittorrente等。

Wireshark从1.12.0版本开始支持Kafka通信协议,到现在最新的3.4.5更完善支持协议。通过Wireshark分析学习Kafka通信协议加深对Kafka的理解和问题处理。

由于Wireshark在开源模式下进行开发,每次更新都会增加一些对新协议的支持。后续鲲鹏运维将考虑对Pulsar协议的支持调研。

参考资料:

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
【Kafka】使用Wireshark抓包分析Kafka通信协议
Wireshark (前身 Ethereal)是一个网络封包分析软件。网络封包分析软件的功能是撷取网络封包,并尽可能显示出最为详细的网络封包资料。
<<上一篇
下一篇>>