Kafka Loader用户手册

总览

Kafka是一款目前十分流行的“发布-订阅”消息中间件系统; 通过Kafka, 系统可以向用户提供一整套实时的、可容错的分布式管道服务。TigerGraph的Kafka加载器能够让用户非常便捷地与现有的Kafka集群整合,从而加快数据的实时分析。同时,用户还能够通过使用一系列Kafka生态系统中的插件轻松实现架构扩展。

Kafka加载器会读取并处理Kafka集群中的数据,并将结果导入TigerGraph系统中。

系统架构

简单来说,Kafka加载器的工作方式为: 用户通过GSQL命令告诉TigerGraph系统要做什么,然后TigerGraph系统从外部的Kafka集群中将数据导入它的RestPP服务器中。下图简要地展示了Kafka加载器的基本架构:

准备工作

Kafka集群已经正确配置并正常工作。

用户需要准备以下两个Kafka集群的配置文件,并将文件复制到TigerGraph系统规定的目录中:

  1. Kafka配置文件: 这个文件包含了外部Kafka服务器(broker)的域名和端口号。TigerGraph系统通过该配置文找到并连接Kafka集群。 具体配置步骤请参考 第一步: 定义数据源.

  2. Kafka话题(topic)与分区(Partition)配置文件: 该文件包含了Kafka的话题和分区列表, 并包含加载数据的起始偏移量。 具体配置步骤请参考 第二步: 创建加载作业.

配置和使用Kafka加载器

使用Kafka加载器的三个主要步骤为:

Kafka加载器所使用的GSQL语法与现有的GSQL加载语法相同。

1. 定义数据源

CREATE DATA_SOURCE

在创建Kafka数据源的加载作业前,用户首先要定义Kafka的服务器地址。 用户可以通过CREATE DATA_SOURCE语句定义一个data_source变量:

Kafka数据源配置文件

在数据源创建之后,用户需要使用SET命令配置针对该数据源的配置文件:

在执行过程中,该SET命令会读取并检查该配置文件。如果没有问题,则会将该配置文件整合到TigerGraph的字典中。 数据源配置文件为JSON格式对象,包含了Kafka服务器的全局配置信息,即数据源的IP地址和端口号。 下面为一个非常简单的kafka.conf文件的例子:

“broker”参数是必须的,而"kafka_config"参数(提供一些额外的配置信息,详情请参考Kafka的官方文档)则为可选值。 用户可以通过一系列键值对的形式定义"kafka_config"参数。 例如:

用户也可以将CREATE DATA_SOURCE语句和SET语句合二为一:

高级功能: 多图模式的支持

Kafka加载器支持TigerGraph的多图模式。 在多图模式的情况下, 数据源可以是私有的,也可以是全局的。

  1. 全局数据源只能由超级管理员用户创建, 且必须由超级管理员分配到每个图中。

  2. 普通管理员只能创建仅适用于本图的私有数据源。 该数据源不能分配给其他的图使用。

下面的例子列出了几种允许的DATA_SOURCE操作的示例:

1.一名超级管理员可以创建全局数据源,并且不将其分配给任何图:

2. 一名超级管理员可以向(从)一张或多张图中赋予(收回)一个数据源:

3. 一名普通管理员可以为他管理的图创建私有数据源:

DROP DATA_SOURCE

一个data_source变量可以由拥有权限的用户删除; 例如,全局数据源只能被超级管理员删除, 而私有数据源则即可以被该图的普通管理员删除,也可以被超级管理员删除。 DROP语句的语法格式如下:

下面列出的创建或删除数据源命令均为合法命令:

SHOW DATA_SOURCE

拥有权限的用户可以使用SHOW DATA_SOURCE语句读取所有现有的数据源信息:

2. 创建加载作业

Kafka加载器使用与标准GSQL加载作业相同的 创建加载作业语法。 用户需要使用DEFINE FILENAME语句声明一个FILENAME变量, 用于让加载器找到Kafka的配置文件。

同时, 用户也可以选择在RUN LOADING JOB语句中使用USING方法来设定一个新的文件地址。 RUN语句中的文件地址将覆盖之前在CREATE LOADING JOB中配置的文件地址。

下例展示了用于Kafka加载器的DEFINE FILENAME的语法。 在该语句中, $DATA_SOURCE_NAME参数表示Kafka的数据源名称以及一条指向配置文件的路径地址, 该配置文件包含了kafka集群的话题及分区信息。

示例: 加载一个Kafka数据源 k1 , 话题及分区信息配置文件地址为: "~/topic_partition1.conf":

Kafka Topic-Partition 配置文件

Kafka Topic-Partition配置文件用于告诉TigerGraph如何阅读Kafka中的数据。 与之前的数据源配置文件类似,该话题-分区配置文件也是以JSON格式保存的。 下面是一个例子:

"topic"参数值是必须的。 "partition_list"参数值则是可选的, 该可选参数可用于声明需要读取的话题及分区,以及开始点的偏移量。 如果"partition_list"值为空,则表示该话题下的所有分区都会被加载。 默认的起始点偏移量为 “-1”, 表示加载作业从最新的消息开始(例如从该话题的末尾开始加载)。 如果用户希望从话题的起始行开始加载, 则"start_offset"的参数值应该设为“-2”。

用户可以通过修改Kafka Topic-Partition文件的"default_start_offset"参数值来修改默认的偏移量。 例如:

除了直接配置参数文件地址之外, 用户也可以选择直接将Topic-Partition 配置信息编辑成字符串, 例如:

3. 执行加载作业

Kafka加载器使用与标准GSQL执行加载作业相同的执行加载作业语法。 用户可以向每个文件名变量输入一个形似“DATA_SOURCE Var:topic_partition configure”一样的字符串, 用于覆盖加载作业中早先定义的文件名。 在下面的例子中, f1使用的是在CREATE LOADING JOB中赋予的值, 而f3和f4则使用在RUN语句中赋予的值。

一次加载作业中的所有文件名变量必须指向同一个DATA_SOURCE变量。

Kafka加载器有两种加载模式: 流模式(Streaming Mode)和EOF模式。 默认模式为流模式。 在流模式中, 加载器的加载动作只有在作业停止后才会停止。 而在EOF模式中, 加载器在完成读取所有的当前的Kafka消息后便会停止。

如果需要将模式修改为EOF模式,需要在RUN LOADING JOB语句后添加一个参数:

管理加载作业

管理Kafka的加载作业与管理普通加载作业的方法相同。三个主要的管理命令为:

  • SHOW LOADING STATUS

  • ABORT LOADING JOB

  • RESUME LOADING JOB

例如: SHOW LOADING STATUS命令的语法如下:

如果想单独读取某个作业的情况, 用户需要在RUN LOADING JOB语句中添加job_id参数。 对于针对单独作业的SHOW命令来说, 下面的信息会显示出来:

  1. 对于每一个分区, 当前已加载的偏移量

  2. 平均加载速度

  3. 加载数据量

  4. 加载持续时间

详情请参考: 监控与管理加载作业

Kafka加载器实际案例

下面将演示一个通过Kafka加载器导入数据的案例: