0%

connect-http

HTTP Sink 配置

最少配置

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"http.api.url": "http://localhost:8288/api/messages",
"request.method": "POST",
"topics": "REASON_HTTP",
"tasks.max": "1",
"request.body.format": "json",
"confluent.topic.bootstrap.servers": "localhost:9092",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}

配置解释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 数据格式配置,默认string类型
request.body.format=string,json

# 接受请求的地址
http.api.url=http://localhost:8288/api/messages

# 需要处理的topic
topics=my_topicc

# 启动多少个任务
tasks.max=2

# kafka 地址
confluent.topic.bootstrap.servers=host:port

请求体

1
2
3
4
POST http://localhost:8288/api/messages
Content-Type: text/plain;charset=UTF-8

[{"UNAME":"Reason","UADDRESS":"keji rod","UEMAIL":"reason@newegg.com","UAGE":12}]

java 接收 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

@RestController
public class MyController {


@PutMapping(path = "/api/messages")
public String putMessage(@RequestBody String message){
System.out.println("put message:");
System.out.println(message);
return "success";
}

@PostMapping(path = "/api/messages")
public String postMessage(@RequestBody String message){
System.out.println("post message:");
System.out.println(message);
return "success";
}

}

1
2
3
4
string post message:
{UNAME=Reason, UADDRESS=keji rod, UEMAIL=reason@newegg.com, UAGE=12}
json post message:
[{"UNAME":"Reason","UADDRESS":"keji rod","UEMAIL":"reason@newegg.com","UAGE":12}]