apache beam KafkaIO的使用(流处理IO)

举报
breakDawn 发表于 2020/08/18 09:41:12 2020/08/18
【摘要】 这里我以下面这个为例,弄1个demo这里我以下面这个为例,弄1个demo![image.png](http://image.huawei.cn/tiny-lts/v1/images/f02ee2683767ede57975_310x621.png@900-0-90-f.png)第一步建立option和pipeline```java PipelineOptions kafkaO...

这里我以下面这个为例,弄1个demo


这里我以下面这个为例,弄1个demo

![image.png](http://image.huawei.cn/tiny-lts/v1/images/f02ee2683767ede57975_310x621.png@900-0-90-f.png)


第一步建立option和pipeline

```java

        PipelineOptions kafkaOption = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PipelineOptions.class);

        Pipeline pipeline = Pipeline.create(kafkaOption);


```


## 如何消费beamKafaTest主题里的数据

首先要建立1个kafkaRead对象

```java

        KafkaIO.Read<String, String> kafkaRead =

                KafkaIO.<String, String>read()

                .withBootstrapServers("189.211.150.241:21005,189.211.150.246:21005,189.211.151.18:21005,189.211.151.36:21005")

                .withTopic("beamKafkaTest")

                .withKeyDeserializer(StringDeserializer.class)

                .withValueDeserializer(StringDeserializer.class);

```

boostrapServer的ip和端口视各自集群上的kafka配置而定

接着用apply进行组装,组装时要加上withoutMetaData作为最终的建立操作

```java

PCollection<KV<String, String>> pKv = pipeline.apply(kafkaRead.withoutMetadata());

```


## 设置窗口

数据集里的KV<String,String>指的就是消费的kafkaRecord里的key和value,这里直接帮我们转成KV了

* 我只需要里面的value整数值,因此需要把Key-Value转成Value。

* 同时我们希望以每5秒作为1个窗口将结果进行收集,因此需要使用windows类进行组装

* 同时要把5秒内的结果做个总和作为输出,要使用Sum这个类,并且要加上withoutDefaults,否则无法进行流处理。

```java

        PCollection<Integer> pIntSum = pKv.apply(MapElements.via(new SimpleFunction<KV<String, String>, Integer>() {

            @Override

            public Integer apply(KV<String, String> input) {

                return Integer.valueOf(input.getValue());

            }

        }))

                .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))

                .apply(Sum.integersGlobally().withoutDefaults());

```

## 生产消息到beamPrint主题

生产消息就比较简单了, 注意要先map成KV对象,才能使用KafkaIO.write

```java

pIntSum.apply(MapElements.<Integer, KV<Integer, String>>via(new SimpleFunction<Integer, KV<Integer, String>>() {

            // v转kv

            @Override

            public KV<Integer, String> apply(Integer input) {

                return KV.of((input!=null?input:0), "answer=" + (input!=null?input:0));

            }

        }))

                .apply(KafkaIO.<Integer, String>write()

                        .withBootstrapServers("189.211.150.241:21005,189.211.150.246:21005,189.211.151.18:21005,189.211.151.36:21005")

                        .withTopic("beamPrint")

                        .withKeySerializer(IntegerSerializer.class)

                        .withValueSerializer(StringSerializer.class));

```


## 测试

启动beamDemo程序,然后用Kafka客户端脚本打开生产者和消费者。  

5秒内在beamKafkaTest中输入1、2、3

然后可在beamPrint中看到输出:

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。