4+

Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十三、min、minBy、max、maxBy函数》cosmozhu写的本系列文章的第十三篇。通过简单的DEMO来演示min、minBy、max、maxBy函数执行的效果 。

需求

本篇文章我们来区分min(max)与minBy(maxBy)之间的区别,下面案例是每10秒计算一次最近1分钟的最小值订单。

解决方案


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
        DataStreamSource<Tuple2<String, Integer>> orderSource = env
                .addSource(new SourceFunction<Tuple2<String, Integer>>() {
                    private volatile boolean isRunning = true;
                    private final Random random = new Random();

                    @Override
                    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                        while (isRunning) {
                            TimeUnit.SECONDS.sleep(1);
                            Tuple2<String, Integer> t = Tuple2.of(TYPE[random.nextInt(TYPE.length)], random.nextInt(1000));
                            LOG.info("提交数据:"+t);
                            ctx.collect(t);
                        }
                    }
                    @Override
                    public void cancel() {
                        isRunning = false;
                    }

                }, "order-info");

        orderSource
        .timeWindowAll(Time.minutes(1),Time.seconds(10))
        .min(1)
//      .minBy(1)
        .print();

        env.execute("Flink Streaming Java API Skeleton");
    }

执行效果

执行min函数,我们可以看出min函数确实返回了最小值,但是最小值前面对应的商品名称却对应不上。

15:52:20,261 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,542)
15:52:21,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,331)
15:52:22,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,894)
15:52:23,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,791)
15:52:24,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,836)
15:52:25,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,198)
15:52:26,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,221)
15:52:27,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,309)
15:52:28,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,553)
15:52:29,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,969)
2> (梨,198)
15:52:30,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,981)
15:52:31,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,483)
15:52:32,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,381)
15:52:33,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,982)
15:52:34,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,738)
15:52:35,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,654)
15:52:36,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,124)
15:52:37,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,703)
15:52:38,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,996)
15:52:39,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,127)
3> (梨,124)
15:52:40,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,249)
15:52:41,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,705)
15:52:42,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,37)
15:52:43,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,647)
15:52:44,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,842)
15:52:45,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,475)
15:52:46,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,999)
15:52:47,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,994)
15:52:48,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,417)
15:52:49,282 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,830)

执行minBy函数,我们可以看出minBy返回的最小值,并且对应的商品名称也是正确的


15:59:06,657 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,935)
15:59:07,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,638)
15:59:08,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,485)
15:59:09,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,720)
4> (梨,485)
15:59:10,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,407)
15:59:11,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,353)
15:59:12,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,76)
15:59:13,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,629)
15:59:14,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,974)
15:59:15,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,263)
15:59:16,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,840)
15:59:17,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,220)
15:59:18,684 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,27)
15:59:19,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,94)
1> (苹果,27)
15:59:20,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,733)
15:59:21,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,390)
15:59:22,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,766)
15:59:23,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,321)
15:59:24,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(苹果,784)
15:59:25,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(梨,781)
15:59:26,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(火龙果,459)
15:59:27,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,481)
15:59:28,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(西瓜,91)
15:59:29,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,663)
2> (苹果,27)
15:59:30,685 INFO  fun.cosmozhu.session13.StreamTest                             - 提交数据:(葡萄,183)

小结

从上面的案例我们可以分析得出:

  1. min只返回计算的最小值,而最小值对应的其他数据不保证正确。
  2. minBy返回计算的最小值,并且最小值对应的其他数据是保证正确的。

max和maxBy与其相似

代码地址

https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session13/StreamTest.java

作者:cosmozhu --90后的老父亲,专注于保护地球的程序员
个人网站:https://www.cosmozhu.fun
欢迎转载,转载时请注明出处。