java并行流的老生常谈与重新思考

Stream API是Java 8推出的新特性,眼看Java 9 (2017年7月份) 即将推出,为什么还来谈Stream呢?毕竟自从Java 8推出以来,关于流的具体使用的文章相当之多,因此本文也不打算重复描述Stream的使用方法,而是想谈谈很少被提及的一些基本问题。

1.Java 8引入lambda语法的设计初衷是什么?是为了解决Java语言为人诟病的语法繁琐问题吗?

Java 8以前,很多函数式编程语言支持lambda表达式,然而对于 Java 8来说,其设计初衷并不是为了把Java转变成一个函数式编程语言,也不完全是为了改善代码可读性,其根本目的只是为了流的API设计和引入的,而流的目的则是利用多核并行,提高代码性能。

2. 流又是如何提高性能的?

首先是管道流思想对性能的提升,管道流思想将对象的集合以及操作抽象成管道和流,流抽象了数据,管道抽象了操作。管道犹如生产线的工人,对数据进行处理,处理完一个产品工序将立即传送到下一道工序手中,而不是让第一个工人把所有产品处理完再把所有产品传送到下一个工人手中进行下一个工序。这样就减少了下游操作对上游数据的等待,从而实现了不同处理操作的同时进行,即并行。

可以举一个常见的案例:

例如我们要写一个爬虫,很多网站搜索结果是分页的,第一页~第N页,我们现在需要获取某电影网站所有页的电影,并输出。

public class Spider {
    static List<Movie> getAllMovies() {
        List<Movie> movies = new LinkedList<>();
        for (int i = 0; i < getPageTotal(); i++) {
            movies.addAll(getMovies(page));
        }
        return movies;
    }

    static List<Movie> getMovies(int page) {
    }

    static int getPageTotal() {
    }

    public static void main(String[] args) {
        for (Movie movie : getAllMovies()) {
            System.out.println(movie.toString());
        }
    }
}

以上代码简单明了,就是有个缺陷,必须得所有电影获取结束之后才能一次性输出电影,而我们知道爬虫网络操作很慢,可能等待几个小时后才会看到输出结果,显然不能这么写,那么不妨单独写一个边爬边输出的方法:

public class Spider {
    static List<Movie> getAllMovies() {
        List<Movie> movies = new LinkedList<>();
        for (int i = 0; i < getPageTotal(); i++) {
            movies.addAll(getMovies(page));
        }
        return movies;
    }

    static List<Movie> getAllMoviesAndOutput() {
        List<Movie> movies = new LinkedList<>();
        for (int i = 0; i < getPageTotal(); i++) {
            List<Movie> moviePageI = getMovies(page);
            movies.addAll(moviePageI);
            for (Movie movie : moviePageI) {
                System.out.println(movie.toString());
            }
        }
        return movies;
    }

    static List<Movie> getMovies(int page) {
    }

    static int getPageTotal() {
    }

    public static void main(String[] args) {
        getAllMoviesAndOutput();
    }
}

如上代码通过新增了一个方法,在每一页结果获取后,立即输出,然而不足有二:

  1. getAllMoviesAndOutput()方法和getAllMovies()方法重复代码很多,代码复用性低
  2. 如果需求改变,不需要输出,而是存入数据库,岂不是得再写一个方法

新增需求的方法就是对数据的后续操作,其实和单纯获取的代码只是在中间插入一些代码而已,所以这也是一个AOP问题。

那么Stream是如何解决该问题的呢?

    static Stream<Movie> getAllMovies() {
        return IntStream.range(0, getPageTotal()).flatMap(page -> getMovies(page).stream());
    }

这段获取所有电影的函数返回一个Stream对象,值得注意的是Stream中的非终止操作不会立即进行计算的,而是会延迟计算(Lazy evaluation),一直遇到终止操作才会真正触发计算。

这也可以用生产线模型来解释,我们的爬虫生产线上游产生电影对象,下游可能会输出,可能会数据库存储,还可能在中间对电影对象进行二次过滤,转换等各类操作,上游生产线不会立即爬取所有电影,而是等下游所有生产线操作确定以后,才会触发电影的爬取工作,每爬取一部电影,就会传递给下游。而这个操作确定的过程对于代码来说则是瞬间完成的,毕竟代码运行前逻辑便已经确定,但是这样就做到了完美的解耦。

3.流解决了数据与计算的解耦,那流具体是怎么实现的?此外C++中传函数地址,Java中传函数对象不也可以实现这种解耦吗?

确实,如果将函数对象传进来,将对数据的计算操作封装成一个函数,也可以做到计算与数据的解耦。例如下面的代码:

public class Spider {
    static List<Movie> getAllMoviesAndOperate(Consumer<Movie> operation) {
        List<Movie> movies = new LinkedList<>();
        for (int i = 0; i < getPageTotal(); i++) {
            List<Movie> moviePageI = getMovies(page);
            movies.add(moviePageI);
            for (Movie movie : moviePageI) {
                operation.accept(movie);
            }
        }
        return movies;
    }

    static List<Movie> getMovies(int page) {
    }

    static int getPageTotal() {
    }

    public static void main(String[] args) {
        getAllMoviesAndOperate(System.out::println);
    }
}

通过将函数作为一个对象,对数据应用程序员选择的函数,确实实现了数据与操作的解耦,事实上Java 8流的实现也类似于此,但是这种写法比较繁琐,且抽象层次不够高。

其缺陷是,如果我们需要做的Operation的需求不断变更,既要输出又要输出前对属性做一些修改,甚至是非常复杂的操作,岂不是要自行封装一个复杂的操作函数,然后再将该函数作为参数。这样程序的复杂度就集中到该函数上,这个流水线就变成了二工序流水线,第二个工序十分复杂,这与最初解耦降低复杂度的想法南辕北辙了,解决方案也很简单,可以让第二个函数也接受一个函数,作为第三个工序,依此类推。其实这就已经实现了一个流的管道传递的雏形了。

4.lambda表达式既然是为了流而产生的,那是怎么具体应用到流中的呢?

通过上一个问题的描述,答案已经很显然了,流的具体实现,强烈依赖于将函数作为参数,而在 Java 8 之前,即使声明一个最简单的函数,也需要三行打码,包含函数命名,函数参数类型,参数命名,返回值等,这已经阻碍了流对函数的大量创建的需要,lambda表达式则可以简洁的表示一个函数,因此被采用。

5.匿名内部类不也是可以实现lambda表达式的作用吗,lambda表达式不就是一个语法糖吗?

虽然语法复杂了些,但是一个只有一个函数的匿名内部类确实可以实现lambda表达式的主要功能,但存在两种意义上的区别:

  • 一者是显式的语义区别。匿名内部类会引入新的命名作用域,在其中使用this获取的是匿名类自身,而lambda表达式则不会引入新的作用域,降低了不必要的名称查找复杂性。另外一点,匿名内部类会确保创建唯一标识的新对象,而lambda表达式则可能没有,这样可以让平台使用更高效的实现策略。
  • 二者是设计哲学上的区别。一种更高层次的抽象永远不能被嘲笑为语法糖。譬如高级语言是汇编语言的语法糖吗?面向对象是面向过程的语法糖吗?一种高层抽象看似只是简化了代码,实则改变了编程思维方式,因而可以设计出复杂度更高的代码。具体到lambda表达式,并行流的设计产生与lambda表达式密不可分,代码简化只是附加好处。

6.流与多核,并行是什么关系?

流(Stream)的API有两种意义上的并行,一种就是前面所提到的管道过滤器的并行使用,这是纵向的并行,即管道之间相互并行,每个工序同时进行,另一种就是横向的并行,流的分割,流其中的数据元素并行输入,被分割迭代。这是流具体实现的另一个重要步骤。

Java 流使用的是fork-join框架,对与输入流反复分割,直到特定条件为止(由分割器决定,用户可以自行覆写分割器),根据分割器的分配,Java 8 Stream会将流元素分配到不同线程,每个线程处理部分数据,并将处理完的数据两份两份的迭代进行收集,收集策略由收集器决定。

譬如对于给定的整数集合求和,Java 8 Stream API 已经实现了求和函数,该实现会将整数均分为两份,没两份又各自均分成两部分,反复迭代,默认会分成处理器核心个数相等的份数,然后将这些数每一份分配到线程池中的一个线程,计算部分数的和,最后计算完毕,再同样的对结果进行迭代求和。

因此流可以充分并行,利用多核性能。

7.流不就是多线程处理数据吗,我们直接编写多线程代码不也可以实现吗?

首先,自己编写多线程代码确实也可以利用多核性能。但是,并行流的主要目标是,让最基本的计算也能够具有并行能力,自己编写很多代码有些小题大作,而且由于编码复杂,程序员也不愿意在并行上花功夫。

并行流处理的场景也比自己编写多线程代码窄一些,并行流主要处理场景是将参数不同的函数分发到不同线程上,线程不关心也无法访问外部变量,这正是编程中很常见的一种场景,因为引入同步会极大增加复杂性。因此并行流是为最常见并行任务而生的,为了促进程序员写出更多有可并行能力Java代码。

文中示例代码使用Stream API改写

public class Spider {
    static Stream<Movie> getAllMovies() {
        return IntStream.range(0, getPageTotal()).flatMap(page -> getMovies(page).stream());
    }

    static List<Movie> getMovies(int page) {
    }

    static int getPageTotal() {
    }

    public static void main(String[] args) {
        getAllMovies().forEach(movie -> System.out::println);
    }
}

参考文献

《Mastering Lambdas: Java Programming in Multicore Word》-Maurice Naftalin

共有 0 条评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注