TigerGraph文档
2.4
Search…
⌃K

Kafka Loader用户手册

总览

Kafka是一款目前十分流行的“发布-订阅”消息中间件系统; 通过Kafka, 系统可以向用户提供一整套实时的、可容错的分布式管道服务。TigerGraph的Kafka加载器能够让用户非常便捷地与现有的Kafka集群整合,从而加快数据的实时分析。同时,用户还能够通过使用一系列Kafka生态系统中的插件轻松实现架构扩展。
Kafka加载器会读取并处理Kafka集群中的数据,并将结果导入TigerGraph系统中。

系统架构

简单来说,Kafka加载器的工作方式为: 用户通过GSQL命令告诉TigerGraph系统要做什么,然后TigerGraph系统从外部的Kafka集群中将数据导入它的RestPP服务器中。下图简要地展示了Kafka加载器的基本架构:

准备工作

Kafka集群已经正确配置并正常工作。
用户需要准备以下两个Kafka集群的配置文件,并将文件复制到TigerGraph系统规定的目录中:
  1. 1.
    Kafka配置文件: 这个文件包含了外部Kafka服务器(broker)的域名和端口号。TigerGraph系统通过该配置文找到并连接Kafka集群。 具体配置步骤请参考 第一步: 定义数据源.
  2. 2.
    Kafka话题(topic)与分区(Partition)配置文件: 该文件包含了Kafka的话题和分区列表, 并包含加载数据的起始偏移量。 具体配置步骤请参考 第二步: 创建加载作业.

配置和使用Kafka加载器

使用Kafka加载器的三个主要步骤为:
Kafka加载器所使用的GSQL语法与现有的GSQL加载语法相同。

1. 定义数据源

CREATE DATA_SOURCE
在创建Kafka数据源的加载作业前,用户首先要定义Kafka的服务器地址。 用户可以通过CREATE DATA_SOURCE语句定义一个data_source变量:
CREATE DATA_SOURCE KAFKA data_source_name
Kafka数据源配置文件
在数据源创建之后,用户需要使用SET命令配置针对该数据源的配置文件:
SET data_source_name = "/path/to/kafka.config"
在执行过程中,该SET命令会读取并检查该配置文件。如果没有问题,则会将该配置文件整合到TigerGraph的字典中。 数据源配置文件为JSON格式对象,包含了Kafka服务器的全局配置信息,即数据源的IP地址和端口号。 下面为一个非常简单的kafka.conf文件的例子:
{
"broker": "broker.full.domain.name:9092",
}
“broker”参数是必须的,而"kafka_config"参数(提供一些额外的配置信息,详情请参考Kafka的官方文档)则为可选值。 用户可以通过一系列键值对的形式定义"kafka_config"参数。 例如:
{
"broker": "broker.full.domain.name:9092",
"kafka_config": {"group.id":"tigergraph"}
}
用户也可以将CREATE DATA_SOURCE语句和SET语句合二为一:
CREATE DATA_SOURCE KAFKA data_source_name = "/path/to/kafka.config"
  1. 1.
    如果用户的TigerGraph为集群架构,则上述配置文件必须保存在m1节点(即同时拥有GSQL server和GSQL client的节点)上,且必须为JSON格式。 如果配置文件中使用了相对路径, 则相对路径的起始点必须是GSQL客户端的工作目录。
  2. 2.
    每次更改过配置文件的内容后,用户都必须执行"SET data_source_name"命令来更新字典中的数据源信息。

高级功能: 多图模式的支持

Kafka加载器支持TigerGraph的多图模式。 在多图模式的情况下, 数据源可以是私有的,也可以是全局的。
  1. 1.
    全局数据源只能由超级管理员用户创建, 且必须由超级管理员分配到每个图中。
  2. 2.
    普通管理员只能创建仅适用于本图的私有数据源。 该数据源不能分配给其他的图使用。
下面的例子列出了几种允许的DATA_SOURCE操作的示例:
1.一名超级管理员可以创建全局数据源,并且不将其分配给任何图:
CREATE DATA_SOURCE KAFKA k1 = "/path/to/config"
2. 一名超级管理员可以向(从)一张或多张图中赋予(收回)一个数据源:
GRANT DATA_SOURCE k1 TO GRAPH graph1, graph2
REVOKE DATA_SOURCE k1 FROM GRAPH graph1, graph2
3. 一名普通管理员可以为他管理的图创建私有数据源:
CREATE DATA_SOURCE KAFKA k1 = "/path/to/config" FOR GRAPH test_graph
在上面的例子中,私有数据源kl只能被test_graph访问。

DROP DATA_SOURCE

一个data_source变量可以由拥有权限的用户删除; 例如,全局数据源只能被超级管理员删除, 而私有数据源则即可以被该图的普通管理员删除,也可以被超级管理员删除。 DROP语句的语法格式如下:
DROP DATA_SOURCE <source1>[<source2>...] | * | ALL
下面列出的创建或删除数据源命令均为合法命令:
CREATE DATA_SOURCE KAFKA k1 = "/home/tigergraph/kafka.conf"
CREATE DATA_SOURCE KAFKA k2 = "/home/tigergraph/kafka2.conf"
DROP DATA_SOURCE k1, k2
DROP DATA_SOURCE *
DROP DATA_SOURCE ALL

SHOW DATA_SOURCE

拥有权限的用户可以使用SHOW DATA_SOURCE语句读取所有现有的数据源信息:
$ GSQL SHOW DATA_SOURCE *
# 示例输出:
Data Source:
- KAFKA k1 ("127.0.0.1:9092")
The global data source will be shown in global scope. The graph scope will only show the data source it has access to.

2. 创建加载作业

Kafka加载器使用与标准GSQL加载作业相同的 创建加载作业语法。 用户需要使用DEFINE FILENAME语句声明一个FILENAME变量, 用于让加载器找到Kafka的配置文件。
同时, 用户也可以选择在RUN LOADING JOB语句中使用USING方法来设定一个新的文件地址。 RUN语句中的文件地址将覆盖之前在CREATE LOADING JOB中配置的文件地址。
下例展示了用于Kafka加载器的DEFINE FILENAME的语法。 在该语句中, $DATA_SOURCE_NAME参数表示Kafka的数据源名称以及一条指向配置文件的路径地址, 该配置文件包含了kafka集群的话题及分区信息。
DEFINE FILENAME filevar "=" [filepath_string | data_source_string];
data_source_string = $DATA_SOURCE_NAME":"<path_to_configfile>
示例: 加载一个Kafka数据源 k1 , 话题及分区信息配置文件地址为: "~/topic_partition1.conf":
DEFINE FILENAME f1 = "$k1:~/topic_partition1.conf";
Kafka Topic-Partition 配置文件
Kafka Topic-Partition配置文件用于告诉TigerGraph如何阅读Kafka中的数据。 与之前的数据源配置文件类似,该话题-分区配置文件也是以JSON格式保存的。 下面是一个例子:
topic_partition1.conf
{
"topic": "topicName1",
"partition_list": [
{
"start_offset": -1,
"partition": 0
},
{
"start_offset": -1,
"partition": 1
},
{
"start_offset": -1,
"partition": 2
}
]
}
"topic"参数值是必须的。 "partition_list"参数值则是可选的, 该可选参数可用于声明需要读取的话题及分区,以及开始点的偏移量。 如果"partition_list"值为空,则表示该话题下的所有分区都会被加载。 默认的起始点偏移量为 “-1”, 表示加载作业从最新的消息开始(例如从该话题的末尾开始加载)。 如果用户希望从话题的起始行开始加载, 则"start_offset"的参数值应该设为“-2”。
用户可以通过修改Kafka Topic-Partition文件的"default_start_offset"参数值来修改默认的偏移量。 例如:
# all partition will be used if no "partition_list" item
{
"topic": "topicName1"
}
# with empty "partition_list"
{
"topic": "topicName1",
"partition_list": []
}
# overwrite the default start offset
{
"topic": "topicName1",
"default_start_offset", 0
}
除了直接配置参数文件地址之外, 用户也可以选择直接将Topic-Partition 配置信息编辑成字符串, 例如:
DEFINE FILENAME f1 = "$k1:~/topic_partition_config.json";
DEFINE FILENAME f1 = "$k1:{\"topic\":\"zzz\",\"default_start_offset\":2,\"partition_list\":[]}";

3. 执行加载作业

Kafka加载器使用与标准GSQL执行加载作业相同的执行加载作业语法。 用户可以向每个文件名变量输入一个形似“DATA_SOURCE Var:topic_partition configure”一样的字符串, 用于覆盖加载作业中早先定义的文件名。 在下面的例子中, f1使用的是在CREATE LOADING JOB中赋予的值, 而f3和f4则使用在RUN语句中赋予的值。
RUN LOADING JOB job1 USING f1, f3="$k1:~/topic_part3.config", f4="$k1:~/topic_part4.config", EOF="true";
每次执行RUN LOADING JOB命令只能导入同一种类型的数据源。 也就是说,你不能在一次加载作业中同时导入源自Kafka集群的数据和源自普通数据文件中的数据。
一次加载作业中的所有文件名变量必须指向同一个DATA_SOURCE变量。
Kafka加载器有两种加载模式: 流模式(Streaming Mode)和EOF模式。 默认模式为流模式。 在流模式中, 加载器的加载动作只有在作业停止后才会停止。 而在EOF模式中, 加载器在完成读取所有的当前的Kafka消息后便会停止。
如果需要将模式修改为EOF模式,需要在RUN LOADING JOB语句后添加一个参数:
RUN LOADING JOB [-noprint] [-dryrun] [-n [i],j] jobname
[ USING filevar [="filepath_string"][, filevar [="filepath_string"]]*
[, CONCURRENCY="cnum"][,BATCH_SIZE="bnum"]][, EOF="true"]

管理加载作业

管理Kafka的加载作业与管理普通加载作业的方法相同。三个主要的管理命令为:
  • SHOW LOADING STATUS
  • ABORT LOADING JOB
  • RESUME LOADING JOB
例如: SHOW LOADING STATUS命令的语法如下:
SHOW LOADING STATUS job_id|ALL
如果想单独读取某个作业的情况, 用户需要在RUN LOADING JOB语句中添加job_id参数。 对于针对单独作业的SHOW命令来说, 下面的信息会显示出来:
  1. 1.
    对于每一个分区, 当前已加载的偏移量
  2. 2.
    平均加载速度
  3. 3.
    加载数据量
  4. 4.
    加载持续时间
详情请参考: 监控与管理加载作业

Kafka加载器实际案例

下面将演示一个通过Kafka加载器导入数据的案例:
USE GRAPH test_graph
DROP JOB load_person
DROP DATA_SOURCE k1
#create data_source kafka k1 = "kafka_config.json" for graph test_graph
CREATE DATA_SOURCE KAFKA k1 FOR GRAPH test_graph
SET k1 = "kafka_config.json"
# define the loading jobs
CREATE LOADING JOB load_person FOR GRAPH test_graph {
DEFINE FILENAME f1 = "$k1:topic_partition_config.json";
LOAD f1
TO VERTEX Person VALUES ($2, $0, $1),
TO EDGE Person2Comp VALUES ($2, $2, $1)
USING SEPARATOR=",";
}
# load the data
RUN LOADING JOB load_person