Java Stream管道数据处理操作

  1. Java Stream管道数据处理操作
  2. Stream 构成与创建
    1. java.util.Collection.stream() 方法用集合创建流
    2. java.util.Arrays.stream(T[] array)方法用数组创建流
    3. Stream的静态方法:of()、iterate()、generate()
    4. stream和parallelStream的简单区分
  3. 无状态(Stateless)操作
    1. filter:筛选,是按照一定的规则校验流中的元素,将符合条件的元素提取到新的流中的操作。
    2. 映射(map、flatMap、peek)
      1. map:一个元素类型为 T 的流转换成元素类型为 R 的流,这个方法传入一个Function的函数式接口,接收一个泛型T,返回泛型R,map函数的定义,返回的流,表示的泛型是R对象;
      2. flatMap:接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。
      3. peek:peek 操作接收的是一个 Consumer 函数。顾名思义 peek 操作会按照 Consumer 函数提供的逻辑去消费流中的每一个元素,同时有可能改变元素内部的一些属性。
      4. peek AND map
      5. mapToInt、mapToLong、mapToDouble、flatMapToDouble、flatMapToInt、flatMapToLong
  4. 无序化(unordered)
  • 有状态(Stateful)操作
    1. distinct:返回由该流的不同元素组成的流(根据 Object.equals(Object));distinct()使用hashCode()和equals()方法来获取不同的元素。因此,我们的类必须实现hashCode()和equals()方法。
    2. sorted:返回由该流的元素组成的流,并根据自然顺序排序
    3. limit:获取流中n个元素返回的流
    4. skip:在丢弃流的第一个n元素之后,返回由该流的其余元素组成的流。
  • 短路(Short-circuiting)操作
    1. anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true;
    2. allMatch:Stream 中全部元素符合传入的 predicate,返回 true;
    3. noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true.
    4. findFirst:用于返回满足条件的第一个元素(但是该元素是封装在Optional类中)
    5. findAny:返回流中的任意元素(但是该元素也是封装在Optional类中)
  • 非短路(Unshort-circuiting)操作
    1. forEach:该方法接收一个Lambda表达式,然后在Stream的每一个元素上执行该表达式
    2. forEachOrdered:该方法接收一个Lambda表达式,然后按顺序在Stream的每一个元素上执行该表达式
    3. toArray:返回包含此流元素的数组;当有参数时,则使用提供的generator函数分配返回的数组,以及分区执行或调整大小可能需要的任何其他数组
    4. reduce:方法接收一个函数作为累加器,数组中的每个值(从左到右)开始缩减,最终计算为一个值
    5. collect:称为收集器,是一个终端操作,它接收的参数是将流中的元素累积到汇总结果的各种方式
    6. max:根据提供的Comparator返回此流的最大元素
    7. min:根据提供的Comparator返回此流的最小元素
    8. count:返回此流中的元素计数
  • Optional
    1. Optional使用场景
    2. 常用方法使用
      1. empty()
      2. of()
      3. ofNullable()
      4. isPresent()
      5. get()
      6. ifPresent()
      7. filter()
      8. map()
      9. flatMap()
      10. orElse()
      11. orElseGet()
      12. orElseThrow()
  • Java Stream管道数据处理操作

    在使用的过程中分为三个阶段。
    github

    • 第一阶段(图中蓝色):将集合、数组、或行文本文件转换为java Stream管道流
    • 第二阶段(图中虚线部分):管道流式数据处理操作,处理管道中的每一个元素。上一个管道中的输出元素作为下一个管道的输入元素。
    • 第三阶段(图中绿色):管道流结果处理操作。

    中间操作可分为:

    • 无状态(Stateless)操作:指元素的处理不受之前元素的影响
    • 有状态(Stateful)操作:指该操作只有拿到所有元素之后才能继续下去

    终结操作可分为:

    • 短路(Short-circuiting)操作:指遇到某些符合条件的元素就可以得到最终结果
    • 非短路(Unshort-circuiting)操作:指必须处理完所有元素才能得到最终结果

    github

    Stream 构成与创建

    java.util.Collection.stream() 方法用集合创建流

    List<String> list = Arrays.asList("hello","world","stream");
    //创建顺序流
    Stream<String> stream = list.stream();
    //创建并行流
    Stream<String> parallelStream = list.parallelStream();

    java.util.Arrays.stream(T[] array)方法用数组创建流

    String[] array = {"h", "e", "l", "l", "o"};
    Stream<String> arrayStream = Arrays.stream(array);

    Stream的静态方法:of()、iterate()、generate()

    Stream<Integer> stream1 = Stream.of(1, 2, 3, 4, 5, 6);

    Stream<Integer> stream2 = Stream.iterate(0, (x) -> x + 2).limit(3);
    stream2.forEach(System.out::println);

    Stream<Double> stream3 = Stream.generate(Math::random).limit(3);
    stream3.forEach(System.out::println)

    stream和parallelStream的简单区分

    tream是顺序流,由主线程按顺序对流执行操作,而parallelStream是并行流,内部以多线程并行执行的方式对流进行操作,需要注意使用并行流的前提是流中的数据处理没有顺序要求(会乱序,即使用了forEachOrdered)。

    除了直接创建并行流,还可以通过parallel()把顺序流转换成并行流

    Optional<Integer> findFirst = list.stream().parallel().filter(x->x>4).findFirst();

    无状态(Stateless)操作

    filter:筛选,是按照一定的规则校验流中的元素,将符合条件的元素提取到新的流中的操作。

    Stream<T> filter(Predicate<? super T> predicate);

    映射(map、flatMap、peek)

    map:一个元素类型为 T 的流转换成元素类型为 R 的流,这个方法传入一个Function的函数式接口,接收一个泛型T,返回泛型R,map函数的定义,返回的流,表示的泛型是R对象;
    <R> Stream<R> map(Function<? super T, ? extends R> mapper);
    flatMap:接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。
    <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)

    例子:

    List<String> list1 = Arrays.asList("m,k,l,a", "1,3,5,7");
    List<String> listNew = list1.stream().flatMap(s -> {
    // 将每个元素转换成一个stream
    String[] split = s.split(",");
    Stream<String> s2 = Arrays.stream(split);
    return s2;
    }).collect(Collectors.toList());

    peek:peek 操作接收的是一个 Consumer 函数。顾名思义 peek 操作会按照 Consumer 函数提供的逻辑去消费流中的每一个元素,同时有可能改变元素内部的一些属性。
    Stream<T> peek(Consumer<? super T> action);

    这里因为peek是中间操作层,因此如果以他结尾,则无法完成事件消费。

    Stream<String> stream = Stream.of("hello", "felord.cn");
    stream.peek(System.out::println);

    Stream<String> stream = Stream.of("hello", "felord.cn");
    stream.peek(System.out::println).collect(Collectors.toList());

    peek AND map
    • peek 操作 一般用于不想改变流中元素本身的类型或者只想元素的内部状态时;
    • map 则用于改变流中元素本身类型,即从元素中派生出另一种类型的操作。
    mapToInt、mapToLong、mapToDouble、flatMapToDouble、flatMapToInt、flatMapToLong

    以上这些操作是map和flatMap的特例版,也就是针对特定的数据类型进行映射处理。

    IntStream mapToInt(ToIntFunction<? super T> mapper);
    LongStream mapToLong(ToLongFunction<? super T> mapper);
    DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
    IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
    LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
    DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);

    无序化(unordered)

    unordered()操作不会执行任何操作来显式地对流进行排序。它的作用是消除了流必须保持有序的约束,从而允许后续操作使用不必考虑排序的优化。

    对于顺序流,顺序的存在与否不会影响性能,只影响确定性。如果流是顺序的,则在相同的源上重复执行相同的流管道将产生相同的结果;
    如果是非顺序流,重复执行可能会产生不同的结果。 对于并行流,放宽排序约束有时可以实现更高效的执行。
    在流有序时, 但用户不特别关心该顺序的情况下,使用 unordered 明确地对流进行去除有序约束可以改善某些有状态或终端操作的并行性能。

    有状态(Stateful)操作

    distinct:返回由该流的不同元素组成的流(根据 Object.equals(Object));distinct()使用hashCode()和equals()方法来获取不同的元素。因此,我们的类必须实现hashCode()和equals()方法。

    Stream<T> distinct();

    就是去重

    sorted:返回由该流的元素组成的流,并根据自然顺序排序

    该接口有两种形式:无参和有参数

    Stream<T> sorted();
    Stream<T> sorted(Comparator<? super T> comparator);

    那区别其实就在于:传入比较器的参数,可以自定义这个比较器,即自定义比较规则。

    limit:获取流中n个元素返回的流

    mysql的中的limit函数一样的效果,返回指定个数的元素流。

    Stream<T> limit(long maxSize);

    skip:在丢弃流的第一个n元素之后,返回由该流的其余元素组成的流。

    跳过第n个元素,返回其后面的元素流;

    Stream<T> skip(long n);

    短路(Short-circuiting)操作

    anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true;

    boolean anyMatch(Predicate<? super T> predicate);

    allMatch:Stream 中全部元素符合传入的 predicate,返回 true;

    boolean allMatch(Predicate<? super T> predicate);

    noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true.

    boolean noneMatch(Predicate<? super T> predicate);

    findFirst:用于返回满足条件的第一个元素(但是该元素是封装在Optional类中)

    Optional<T> findFirst();

    findAny:返回流中的任意元素(但是该元素也是封装在Optional类中)

    Optional<T> findAny();

    findAny会每次按顺序返回第一个元素。那这个时候,可能会认为findAny与findFirst方法是一样的效果。其实不然,findAny()操作,返回的元素是不确定的,对于同一个列表多次调用findAny()有可能会返回不同的值。使用findAny()是为了更高效的性能。如果是数据较少,串行地情况下,一般会返回第一个结果,如果是并行的情况,那就不能确保是第一个。

    非短路(Unshort-circuiting)操作

    forEach:该方法接收一个Lambda表达式,然后在Stream的每一个元素上执行该表达式

    void forEach(Consumer<? super T> action);

    不同于普通的for循环,他在接收外部参数的时候,要求外部变量必须是最终的,不可变的,所以如果我们想要对其进行修改,那是不可能的!如果必须这么使用,可以将外部变量,移至表达式之中使用才行!
    或者使用原子类来包装这些需要改变的外部参数。

    forEachOrdered:该方法接收一个Lambda表达式,然后按顺序在Stream的每一个元素上执行该表达式

    void forEachOrdered(Consumer<? super T> action);

    该功能其实和forEach是很相似的,也是循环操作!那唯一的区别,就在于forEachOrdered是可以保证循环时元素是按原来的顺序逐个循环的!

    有的时候,forEachOrdered也是不能百分百保证有序!

    Stream.of("AAA,","BBB,","CCC,","DDD,").parallel().forEach(System.out::print);

    在并行流时,由于是多线程处理,其实还是无法保证有序操作的!

    toArray:返回包含此流元素的数组;当有参数时,则使用提供的generator函数分配返回的数组,以及分区执行或调整大小可能需要的任何其他数组

    Object [] toArray();
    <A> A[] toArray(IntFunction<A[]> generator);

    reduce:方法接收一个函数作为累加器,数组中的每个值(从左到右)开始缩减,最终计算为一个值


    Optional<T> reduce(BinaryOperator<T> accumulator);

    T reduce(T identity, BinaryOperator<T> accumulator);

    <U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner);
    • 1个参数:累加处理的纯函数
    • 2个参数:基础上增加一个基础值
    • 3个参数:在基础上,增加了一个针对并行流的一个纯函数,组合器的作用,是对参数2中的数据进行处理,产生的结果进行一次合并器处理。

    collect:称为收集器,是一个终端操作,它接收的参数是将流中的元素累积到汇总结果的各种方式

    <R, A> R collect(Collector<? super T, A, R> collector);

    <R> R collect(Supplier<R> supplier,
    BiConsumer<R, ? super T> accumulator,
    BiConsumer<R, R> combiner);

    第一种方式会比较经常使用到,也比较方便使用,现在先看一看里面常用的一些方法:

    工厂方法 返回类型 用于
    toList List<T> 把流中所有元素收集到List中
    toSet Set<T> 把流中所有元素收集到Set中,删除重复项
    toCollection Collection<T> 把流中所有元素收集到给定的供应源创建的集合中
    Counting Long 计算流中元素个数
    SummingInt Integer 对流中元素的一个整数属性求和
    averagingInt Double 计算流中元素integer属性的平均值
    Joining String 连接流中每个元素的toString方法生成的字符串
    maxBy Optional<T> 一个包裹了流中按照给定比较器选出的最大元素的optional。如果为空返回的是Optional.empty()
    minBy Optional<T> 一个包裹了流中按照给定比较器选出的最小元素的optional。如果为空返回的是Optional.empty()
    Reducing 归约操作产生的类型 从一个作为累加器的初始值开始,利用binaryOperator与流中的元素逐个结合,从而将流归约为单个值
    collectingAndThen 转换函数返回的类型 包裹另一个转换器,对其结果应用转换函数
    groupingBy Map<K,List<T>> 根据流中元素的某个值对流中的元素进行分组,并将属性值做为结果map的键
    partitioningBy Map<Boolean,List<T>> 根据流中每个元素应用谓语的结果来对项目进行分区

    第二种方式看起来跟reduce的三个入参的方法有点类似,也可以用来实现filter、map等操作!

    • 第一个参数:构建新数据结构用于后面使用
    • 第二个参数:用于对循环stream数据的流循环处理,如果在并行是则会对每个数据分线程处理第一个参数数据
    • 第三个参数:只有在并行流中调用。完成第二个参数调用后,就会调用该合并器,与reduce的三个参数一致。

    max:根据提供的Comparator返回此流的最大元素

    Optional<T> max(Comparator<? super T> comparator);

    min:根据提供的Comparator返回此流的最小元素

    Optional<T> min(Comparator<? super T> comparator);

    count:返回此流中的元素计数

    long count();

    Optional

    这是对NullPointerException的异常问题专门提供的方案,在许多语言中都会以不同的方式进行呈现。
    Optional 只是一个容器,它可以保存一些类型的值或者null。

    修饰符和类型 方法和说明
    static <T> Optional<T> empty()返回一个空Optional实例。
    boolean equals(Object obj) 指示其他某个对象是否“等于”此Optional。
    Optional<T> filter(Predicate<? super T> predicate)如果存在一个值,并且该值与给定的谓词匹配,则返回一个Optional描述值的描述,否则返回一个empty Optional。
    <U> Optional<U> flatMap(Function<? super T,Optional<U>> mapper)如果存在值,则将提供的Optional-bearing映射函数应用于该值,返回该结果,否则返回empty Optional。
    T get()如果此值存在Optional,则返回该值,否则抛出NoSuchElementException。
    int hashCode()返回当前值的哈希码值(如果有);如果没有值,则返回0(零)。
    void ifPresent(Consumer<? super T> consumer)如果存在值,请使用该值调用指定的使用者,否则不执行任何操作。
    boolean isPresent()true如果存在值,则返回,否则返回false。
    <U> Optional<U> map(Function<? super T,? extends U> mapper)如果存在值,则将提供的映射函数应用于该值,如果结果为非null,则返回Optional描述结果的描述。
    static <T> Optional<T> of(T value)返回Optional具有指定的当前非空值的。
    static <T> Optional<T> ofNullable(T value)返回Optional描述指定值的描述,如果不为null,则返回null Optional。
    T orElse(T other)返回值(如果存在),否则返回other。
    T orElseGet(Supplier<? extends T> other)返回值(如果存在),否则调用other并返回该调用的结果。
    <X extends Throwable> / T orElseThrow(Supplier<? extends X> exceptionSupplier)返回包含的值(如果存在),否则抛出异常,由提供的供应商创建。
    String toString()返回此Optional的非空字符串表示形式,适用于调试。

    Optional使用场景

    String isocode = user.getAddress().getCountry().getIsocode().toUpperCase();

    //将上面的代码进行判空处理
    if (user != null) {
    Address address = user.getAddress();
    if (address != null) {
    Country country = address.getCountry();
    if (country != null) {
    String isocode = country.getIsocode();
    if (isocode != null) {
    isocode = isocode.toUpperCase();
    }
    }
    }
    }

    可以转换成如下写法:

    Optional.ofNullable(user)
    .map(User::getAddress)
    .map(Address::getCountry)
    .map(Country::getIsocode)
    .ifPresent(s-> s.toUpperCase());

    不仅仅是美观,而是在数据整体的包装且进行数据流传递处理。

    • 当使用值为空的情况,并非源于报错时产生,可以使用Optional(因为有错误的情况,肯定是不正常的,需要处理的)
    • 对于一个对象,我们需要做判空、过滤、某些校验的时候,可以使用Optional

    常用方法使用

    empty()

    empty 方法返回一个不包含值的 Optional 实例, 注意不保证返回的 empty 是单例, 不要用 == 比较。

    public static<T> Optional<T> empty();

    of()

    返回一个 Optional 实例;代表指定的非空值, 如果传入 null 会立刻抛出空指针异常。

    public static <T> Optional<T> of(T value);

    ofNullable()

    返回一个 Optional 实例, 如果指定非空值则实例包含非空值, 如果传入 null 返回不包含值的 empty

    public static <T> Optional<T> ofNullable(T value);

    isPresent()

    isPresent 用来判断实例是否包含值, 如果不包含非空值返回 false, 否则返回 true

    public boolean isPresent();

    get()

    get 方法, 如果实例包含值则返回当前值, 否则抛出 NoSushElementException 异常

    public T get();

    ifPresent()

    ifPresent 方法作用是当实例包含值时, 来执行传入的 Consumer, 比如调用一些其他方法

    public void ifPresent(Consumer<? super T> consumer);

    filter()

    filter 方法用于过滤不符合条件的值, 接收一个 Predicate 参数, 如果符合条件返回代表值的 Optional 实例, 否则返回 empty

    map()

    map 方法是链式调用避免空指针的核心方法, 当实例包含值时, 对值执行传入的 Function 逻辑, 并返回一个代表结果值的新的 Optional 实例;也就是将 optional 中的对象转成 其他对象,或者修改对象中的属性。

    flatMap()

    flatMap方法是将 optional 中的对象转成 optional 对象,或者修改对象中的属性;与map方法类似。
    不同之处在于,前者返回的数据,后者返回的数据需要封装在Stream中。

    orElse()

    orElse方法是如果实例包含值, 那么返回这个值, 否则返回指定的默认值, 如null

    public T orElse(T other);

    orElseGet()

    orElseGet方法是如果实例包含值, 返回这个值;否则,它会执行作为参数传入的 Supplier(供应者) 函数式接口,并将返回其执行结果。

    public T orElseGet(Supplier<? extends T> other);

    orElse与orElseGet不同之处:

    public void givenPresentValue_whenCompare_thenOk() {
    User user = new User("john@gmail.com", "1234");
    logger.info("Using orElse");
    User result = Optional.ofNullable(user).orElse(createNewUser());
    logger.info("Using orElseGet");
    User result2 = Optional.ofNullable(user).orElseGet(() -> createNewUser());
    }

    orElseThrow()

    如果实例不包含值, 调用传入的 Supplier 参数, 生成一个异常实例并抛出.这个方法通常与全局异常处理器一起使用, 当参数或者其他情况获取不到值时, 抛出自定义异常, 由异常处理器处理成通用返回结果, 返回给前端。

    Optional.ofNullable(tempList)
    .orElseThrow(() -> runtimeException)
    .forEach(t -> System.out.println("2:" + t));