0%

KSQLDB 细节整理

除法问题

  • double 类型除不尽时结果为null
  • 复杂计算时,先乘再除

数据处理

  • 如果有多个job同时处理一个大的数据流,可以先对数据流进行清洗,对提升性能有明显效果。

数据格式

  • DELIMITED(逗号隔开)
  • JSON
  • JSON_SR(需要注册)
  • Avro(需要注册)
  • Protocol buffer (Protobuf)
  • KAFKA(Kafka自带序列号)

Group By问题

Group field需要展示时,使用AS_VALUE()进行修饰。

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE DETECTED_CLICKS AS
SELECT
IP_ADDRESS AS KEY1,
URL AS KEY2,
TIMESTAMP AS KEY3,
AS_VALUE(IP_ADDRESS) AS IP_ADDRESS,
AS_VALUE(URL) AS URL,
AS_VALUE(TIMESTAMP) AS TIMESTAMP
FROM CLICKS WINDOW TUMBLING (SIZE 2 MINUTES, RETENTION 3650 DAYS)
GROUP BY IP_ADDRESS, URL, TIMESTAMP
HAVING COUNT(IP_ADDRESS) = 1;

使用Window产生Null Value问题

在使用window时,我们一般会配合使用Having语句,当Having语句中包含比例时,可能会出现先满足条件后又转为不满足,因为Window的输出为Table,所以当不满足时,会生成一个只有key没有value的数据,以表示删除此条key对应的数据。
例如:当我们统计10分钟内,访问购物车页面占总访问的比例超过50%,并且购物车访问次数大于50次的用户时,可能某个用户前3分钟主要访问购物车,但是3分钟后几乎又不访问了。针对这种情况,会出现的结果是前3分钟统计到此用户达标,会被记录下来,但是随着后续的访问,次用户的购物车访问占比不到50%,所以又不达标了,此时会产生一条只有key没有value的kafka消息,已表示此用户又变为不达标情况。

Connect-jdbc Null Value异常

1
2
3
Caused by: org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: CrawlerListNew
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:125)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:62)

当我们使用Connector将ksql计算结果写入DB时,如果我们忽略key,而只需要写入value中的数据,此时遇到value为null的情况就会报上边的错误。因为kafka 中value为null的数据表示删除,所以此条数据应该做删除处理,但是我们却没有做任何处理。解决办法:我们将kafka中的key写入到db的一个字段内,这样就不会报错,同事我们有忽略了key,所以也不会做删除操作。

1
2
3
4
key.ignore=true
delete.enabled=true
pk.mode=record_key
pk.fields=KsqlKey