0%

connect-elasticsearch

Kafka Topic 导入ES

Connector 全局配置

Connector Sink 配置

ES Sink 官方参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"behavior.on.null.values": "IGNORE",
"errors.retry.timeout": "-1",
"tasks.max": "2",
"topics": "Topic_1,Topic_2",
"batch.size": "1000",
"connection.timeout.ms": "10000",
"transforms": "renameTopic,insertTS",
"key.ignore": "true",
"max.retries": "10000",
"retry.backoff.ms": "1000",
"transforms.renameTopic.replacement": "es_index_name",
"schema.ignore": "true",
"value.converter.schema.registry.url": "http://127.0.0.1:8081",
"transforms.renameTopic.regex": ".*",
"errors.tolerance": "all",
"transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"connection.url": "http://127.0.0.1:9200",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms.insertTS.timestamp.field": "messageTS",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

Config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 重新尝试次数和等待时间决定了多久后任务失败。
# 失败重新尝试次数。
max.retries=10000

# 缓冲区最大缓存大小,默认20000。
max.buffered.records=20000

# message value 为null时,处理方式
# ignore 表示忽略value null 的message
# delete 表示删除数据
# fail 表示报错
behavior.on.null.values=IGNORE

# 失败等待时间,1s。
retry.backoff.ms=1000

# 使用transforms对数据进行转换
transforms=renameTopic,insertTS

# 指定index名称
# 通过重命名topic名称实现指定index名称
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=.*
transforms.renameTopic.replacement=es_index_name

# 将kafka message中的timestamp转为指定字段
transforms.insertTS.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertTS.timestamp.field=messageTS