本文介绍kafka2.0的stream操作,并用java实现调用,实现了官网的3个例子。
最简Pipe
pipe相当于管道,连接2个topic的流通道,这里注意的就是消费者的客户端怎么写。
pipe
管道什么都不做:
1 | import org.apache.kafka.common.serialization.Serdes; |
生产者
生产者依然不变:
1 | private KafkaProducer<String, String> producer; |
消费者
现在到了jimo 的主题里,开启消费者去消费。
java客户端需要改为流的形式:
pom.xml
1 | <dependencies> |
1 | public class StreamConsumer { |
LineSplit
出现了个时间戳的问题,
1 | org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = telegraf, partition = 1, offset = 0, CreateTime = -1, serialized key size = -1, serialized value size = 283, headers = RecordHeaders(headers = [], isReadOnly = false) |
于是重写了生成时间戳的方法:
1 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
使用:
1 | properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName()); |
这样写出的offset就是随机的了,不是原来的递增?
生产者
生产者稍微改下:
1 | ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "hehe helo aska cdd" + i); |
LineSplit
1 | public class LineSplit { |
消费者
消费者只需修改group.id
1 | properties.setProperty("group.id", "streams-line-split"); |
Word Count
没什么问题,注意解析结果时要转成数字。
生产者
生产者改了下数据:
1 | final Random r = new Random(); |
WordCount
1 | final StreamsBuilder builder = new StreamsBuilder(); |
消费者
1 | properties.setProperty("group.id", "streams-wordcount"); |
参考
https://kafka.apache.org/20/documentation/streams/quickstart