Java 函数式编程

前些年 Scala 大肆流行,打出来 Java 颠覆者的旗号,究其底气来源,无非是函数式和面向对象的“完美结合”,各式各样的“语法糖”,但其过高的学习门槛,又给了新来者当头一棒。

随着 Java8 的发布,Lambda 特性的引入,之前的焦灼局面是否有所转变,让我们一起揭开 Java 函数式编程的面纱:

  1. 面向对象 VS 函数式
  2. FunctionalInterface 和 Lambda
  3. 类库的升级改造(默认方法、静态方法、Stream、Optional)
  4. Lambda 下模式的进化
  5. Lambda 下并发程序

1. 面向对象 VS 函数式编程

一句话总结两种的关系:面向对象编程是对数据进行抽象;而函数式编程是对行为进行抽象。

在现实世界中,数据和行为并存,程序也应如此,可喜可贺的是在 Java 世界中,两者也开启了融合之旅。

首先思考一个问题, 在 Java 编程中,我们如何进行行为传递,例如我们需要打印线程名称和当前时间,并将该任务提交到线程池中运行,会有哪些方法?

方法 1:新建 class Task 实现 Runnable 接口

1
2
3
4
5
6
7
public class Task implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms");
}
}
executorService.submit(new Task());

方法 2:匿名内部类实现 Runnable 接口

1
2
3
4
5
6
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms");
}
});

方法 3:使用 Lambda 表达式

1
executorService.submit(()-> System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms"));

方法 4:使用方法引用

1
2
3
4
5
6
 private void print(){
System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms");
}
{
executorService.submit(this::print);
}

通过上面不同的行为传递方式,能够比较直观的体会到随着函数式特性的引入,行为传递少了很多样板代码,增加了一丝灵活;可见Lambda表达式是一种紧凑的、传递行为的方式。

2. FunctionalInterface 和 Lambda

Java 函数式编程,只有两个核心概念:

FunctionalInterface(函数接口)是只有一个抽象方法的接口,用作 Lambda 表达式的类型。

Lambda 表达式,及要传递的行为代码,更像是一个匿名函数(当然 java 中并没有这个概念),将行为像数据那样进行传递。

换个好理解但是不正规的说法,FunctionalInterface 为类型,Lambda 表达式为值;我们可以将一个 Lambda 表达式赋予一个符合 FunctionalInterface 要求的接口变量(局部变量、方法参数)。

2.1. Lambda 表达式

先看几个 Lambda 表达式的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 不包含参数,用()表示没有参数
// 表达式主体只有一个语句,可以省略{}
Runnable helloWord = () -> System.out.println("Hello World");

// 表达式主体由多个语句组成,不能省略{}
Runnable helloWords = () -> {
System.out.println("Hello");
System.out.println("Word");
System.out.println("Word");
};

// 表达式中只有一个参数,可以省略()
Consumer<String> infoConsumer = msg -> System.out.println("Hello " + msg);

// 表达式由多个参数组成,不可省略()
BinaryOperator<Integer> add1 = (Integer i ,Integer j) -> i + j;

// 编译器会进行类型推断,在没有歧义情况下可以省略类型声明,但是不可省略()
BinaryOperator<Integer> add2 = (i, j) -> i + j;

综上可见,一个 Lambda 表达式主要由三部分组成:

  1. 参数列表
  2. 箭头分隔符(->)
  3. 主体,单个表达式或语句块

我们在使用匿名内部类时有一些限制:引用方法中的变量时,需要将变量声明为 final,不能为其进行重新赋值,如下:

1
2
3
4
5
6
7
final String msg = "World";
Runnable print = new Runnable() {
@Override
public void run() {
System.out.println("Hello" + msg);
}
};

在 Java8 中放松了这个限制,可以引用非 final 变量,但是该变量在既成事实上必须是 final 的,虽然无需将变量声明为 final,在 Lambda 表达式中,也无法用作非最终态变量,及只能给该变量赋值一次(与用 final 声明变量效果相同)。

2.2 FunctionalInterface

FunctionalInterface,只有一个抽象方法的接口就是函数式接口,接口中单一方法命名并不重要,只要方法签名与 Lambda 表达式的类型匹配即可。

Java 内置了常用函数接口如下:

1
2
3
4
5
1. Predicate<T>

参数类型:T
返回值:boolean
示例:Predicate<String> isAdmin = name -> "admin".equals(name);
1
2
3
4
5
2. Consumer<T>

参数:T
返回值:void
示例:Consumer<String> print = msg -> System.out.println(msg);
1
2
3
4
5
3. Function<T,R>

参数:T
返回值:R
示例:Function<Long, String> toStr = value -> String.valueOf(value);
1
2
3
4
5
4. Supplier<T>

参数:none
返回值:T
示例:Supplier<Date> now = () -> new Date();
1
2
3
4
5
5. UnaryOperator<T>

参数:T
返回值:T
示例:UnaryOperator<Boolean> negation = value -> !value.booleanValue();
1
2
3
4
5
6. BinaryOperator<T>

参数:(T, T)
返回值:T
示例:BinaryOperator<Integer> intDouble = (i, j) -> i + j;
1
2
3
4
5
7. Runnable

参数:none
返回值:void
示例:Runnable helloWord = () -> System.out.println("Hello World");
1
2
3
4
5
8. Callable<T>

参数:nont
返回值:T
示例:Callable<Date> now1 = () -> new Date();

当然我们也可以根据需求自定义函数接口,为了保证接口的有效性,可以在上面添加 @FunctionalInterface 注解,该注解会强制 javac 检测一个接口是否符合函数式接口的规范,例如:

1
2
3
4
5
@FunctionalInterface
interface CustomFunctionalInterface{
void print(String msg);
}
CustomFunctionalInterface cfi= msg -> System.out.println(msg);

2.3 方法引用

Lambda 表达式一种常用方法便是直接调用其他方法,针对这种情况,Java8 提供了一个简写语法,及方法引用,用于重用已有方法。

凡是可以使用 Lambda 表达式的地方,都可以使用方法引用。

方法应用的标准语法为 ClassName::methodName,虽然这是一个方法,但不需要再后面加括号,因为这里并不直接调用该方法。

1
2
3
4
5
6
7
8
Function<User, String> f1 = user->user.getName();
Function<User, String> f2 = User::getName;

Supplier<User> s1 = ()->new User();
Supplier<User> s2 = User::new;

Function<Integer, User[]> sa1 = count -> new User[count];
Function<Integer, User[]> sa2 = User[]::new;

方法引用主要分为如下几种类型:

  • 静态方法引用:className::methodName
  • 实例方法引用:instanceName::methodName
  • 超类实体方法引用:supper::mehtodName
  • 构造函数方法引用:className::new
  • 数组构造方法引用:ClassName[]::new

2.4 类型推断

类型推断,是 Java7 就引入的目标类型推断的扩展,在 Java8 中对其进行了改善,程序员可以省略 Lambda 表达式中的所有参数类型,Javac 会根据 Lambda 表达式式上下文信息自动推断出参数的正确类型。

大多数情况下 javac 能够准确的完成类型推断,但由于 Lambda 表达式与函数名无关,只与方法签名相关,因此会出现类型对推断失效的情况,这时可以使用手工类型转换帮助 javac 进行正确的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  // Supplier<String>, Callable<String> 具有相同的方法签名
private void print(Supplier<String> stringSupplier){
System.out.println("Hello " + stringSupplier.get());
}

private void print(Callable<String> stringCallable){
try {
System.out.println("Hello " + stringCallable.call());
} catch (Exception e) {
e.printStackTrace();
}
}

{
// Error, 因为两个print同时满足需求
print(()->"World");
// 使用类型转换,为编译器提供更多信息
print((Supplier<String>) ()->"World");

print((Callable<String>) ()-> "world");

}

3. 类库的升级改造

Java8 另一个变化是引入了 默认方法 和接口的 静态方法 ,自此以后 Java 接口中方法也可以包含代码体了。

3.1 默认方法

默认方法允许接口方法定义默认实现,而所有子类都将拥有该方法及实现。使其能够在不改变子类实现的情况下(很多时候我们无法拿到子类的源码),为所有子类添加新的功能,从而最大限度的保证二进制接口的兼容性。

默认方法的另一个优势是该方法是可选的,子类可以根据不同的需求 Override 默认实现,为其提供扩展性保证。

其中 Collection 中的 forEach,stream 功能都是通过该技术统一添加到接口中的。

1
2
3
4
5
6
7
8
9
10
11
// Collection 中的forEache实现
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
// Collection中的stream实现
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}

从上可见,默认方法的写法也是比较简单的,只需在方法声明中添加 defalut 关键字,然后提供方法的默认实现即可。

和类不同,接口中没有成员变量,因此默认方法只能通过调用子类的方法来修改子类本身,避免了对子类的实现做出各种假设。

3.1.1 默认方法与子类

添加默认方法特性后,方法的重写规则也发生了变化,具体的场景如下:

a. 没有重写

没有重写是最简单的情况,子类调用该方法的时候,自然继承了默认方法。

1
2
3
4
5
6
7
8
9
10
interface Parent{
default void welcome(){
System.out.println("Parent");
}
}

// 调用Parent中的welcome, 输入"Parent"
class ParentNotImpl implements Parent{

}

b. 子接口重写

子接口对父接口中的默认方法进行了重新,其子类方法被调用时,执行子接口中的默认方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 interface Parent{
default void welcome(){
System.out.println("Parent");
}
}


interface ChildInterface extends Parent{
@Override
default void welcome(){
System.out.println("ChildInterface");
}
}

// 执行ChildInterface中的welcome, 输入 "ChildInterface"
class ChildImpl implements ChildInterface{

}

c. 类重写

一旦类中重写了默认方法,优先选择类中定义的方法,如果存在多级类继承,遵循类继承逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 interface Parent{
default void welcome(){
System.out.println("Parent");
}
}


interface ChildInterface extends Parent{
@Override
default void welcome(){
System.out.println("ChildInterface");
}
}

//执行子类中的welcome方法,输出"ChildImpl"
class ChildImpl1 implements ChildInterface{
@Override
public void welcome(){
System.out.println("ChildImpl");
}
}
3.1.2 多重继承

接口允许多重继承,因此有可能会碰到两个接口包含签名相同的默认方法的情况,此时 javac 并不明确应该继承哪个接口中的方法,因此会导致编译出错,这时需要在类中实现该方法,如果想调用特定父接口中的默认方法,可以使用 ParentInterface.super.method() 的方式来指明具体的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
interface Parent1 {
default void print(){
System.out.println("parent1");
}
}

interface Parent2{
default void print(){
System.out.println("parent2");
}
}

class Child implements Parent1, Parent2{
@Override
public void print() {
System.out.println("self");
Parent1.super.print();
Parent2.super.print();
}
}

现在的接口提供了某种形式上的多继承功能,然而多重继承存在很多诟病。很多人认为多重继承的问题在于对象状态的继承,而不是代码块的继承,默认方法避免了状态的继承,也因此避免了 C++ 中多重继承最大的缺点。

接口和抽象类之间还是有明显的区别。接口允许多重继承,却没有成员变量;抽象类可以继承成员变量,却不能多重继承。

从某种角度出发,Java 通过接口默认方法实现了代码多重继承,通过类实现了状态单一继承。

3.1.3 三定律

如果对默认方法的工作原理,特别是在多重继承下的行为没有把握,可以通过下面三条简单定律帮助大家。

  1. 类胜于方法。
    如果在继承链中有方法体或抽象的方法声明,那么就可以忽略接口中定义的方法。
  2. 子类胜于父类。
    如果一个接口继承另一个接口,且两个接口都定义了一个默认方法,那么子接口中定义的方法胜出。
  3. 没有规则三。
    如果上面两条规则不适用,子类要么实现该方法,要么将该方法声明为抽象方法。

3.2 接口静态方法

人们在编程过程中积累了这样一条经验,创建一个包含很多静态方法的一个类。很多时候类是一个放置工具方法的好地方,比如 Java7 引入的 Objects 类,就包含很多工具方法,这些方法不是属于具体的某个类。

如果一个方法有充分的语义原因和某个概念相关,那么就应该讲该方法和相关的类或接口放在一起,而不是放到另一个工具类中,这非常有助于更好的组织代码。

在接口中定义静态方法,只需使用 static 关键字进行描述即可,例如 Stream 接口中的 of 方法。

1
2
3
4
5
6
7
8
9
10
/**
* Returns a sequential {@code Stream} containing a single element.
*
* @param t the single element
* @param <T> the type of stream elements
* @return a singleton sequential stream
*/
public static<T> Stream<T> of(T t) {
return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
}

3.3 Stream

Stream 是 Java8 中最耀眼的亮点,它使得程序员得以站在更高的抽象层次对集合进行操作。

Stream 是用函数式编程方式在集合类上进行复杂操作的工具。

3.3.1. 从外部迭代到内部迭代

Java 程序员使用集合时,一个通用模式就是在集合上进行迭代,然后处理返回的每一个元素,尽管这种操作可行但存在几个问题:

  • 大量的样板代码
  • 模糊了程序本意
  • 串行化执行

常见集合遍历如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 // 常见写法1,不推荐使用
public void printAll1(List<String> msg){
for (int i=0; i< msg.size(); i++){
String m = msg.get(i);
System.out.println(m);
}
}

// Java5之前,正确写法,过于繁琐
public void printAll2(List<String> msg){
Iterator<String> iterator = msg.iterator();
while (iterator.hasNext()){
String m = iterator.next();
System.out.println(m);
}
}

// Java5之后,加强for循环,采用语法糖,简化for循环,内部转化为Iterator方式
public void printAll3(List<String> msg){
for (String m : msg){
System.out.println(m);
}
}

整个迭代过程,通过显示的调用 Iterator 对象的 hasNext 和 next 方法完成整个迭代,这成为外部迭代。

外部迭代

另一种方式成为内部迭代,及将操作行为作为参数传递给 Stream,在 Stream 内部完成迭代操作。

1
2
3
4
 // Java8中,使用Stream进行内部迭代操作
public void printAll4(List<String> msg){
msg.stream().forEach(System.out::println);
}

内部迭代:

内部迭代

3.3.2. 惰性求值 VS 及早求值

Stream 中存在两类方法,不产生值的方法称为惰性方法;从 Stream 中产生值的方法叫做及早求值方法。

判断一个方法的类别很简单:如果返回值是 Stream,那么就是惰性方法;如果返回值是另一个值或为空,那么就是及早求值方法。

惰性方法返回的 Stream 对象不是一个新的集合,而是创建新集合的配方,Stream 本身不会做任何迭代操作,只有调用及早求值方法时,才会开始真正的迭代。

整个过程与 Builder 模式有共通之处,惰性方法负责对 Stream 进行装配(设置 builder 的属性),调用及早求值方法时(调用 builder 的 build 方法),按照之前的装配信息进行迭代操作。

常见 Stream 操作:

3.3.2.1 collect(toList())

及早求值方法:

collect(toList()) 方法由 Stream 里面的值生成一个列表,是一个及早求值操作。

collect 的功能不仅限于此,它是一个非常强大的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 @Data
class User{
private String name;
}
public List<String> getNames(List<User> users){
List<String> names = new ArrayList<>();
for (User user : users){
names.add(user.getName());
}
return names;
}

public List<String> getNamesUseStream(List<User> users){
// 方法引用
//return users.stream().map(User::getName).collect(toList());
// lambda表达式
return users.stream().map(user -> user.getName()).collect(toList());
}

3.3.2.2. count、max、min

及早求值方法:

Stream 上最常用的操作之一就是求总数、最大值和最小值,count、max 和 min 足以解决问题。

1
2
3
4
5
6
7
8
9
10
11
12
public Long getCount(List<User> users){
return users.stream().filter(user -> user != null).count();
}
// 求最小年龄
public Integer getMinAge(List<User> users){
return users.stream().map(user -> user.getAge()).min(Integer::compareTo).get();
}

// 求最大年龄
public Integer getMaxAge(List<User> users){
return users.stream().map(user -> user.getAge()).max(Integer::compareTo).get();
}

min 和 max 入参是一个 Comparator 对象,用于元素之间的比较,返回值是一个 Optional<T>,它代表一个可能不存在的值,如果 Stream 为空,那么该值不存在,如果不为空,该值存在。通过 get 方法可以获取 Optional 中的值。

3.3.2.3 findAny、findFirst

及早求值方法:

两个函数都以Optional为返回值,用于表示是否找到。

1
2
3
4
5
6
7
8
9
10
11
public Optional<User> getAnyActiveUser(List<User> users){
return users.stream()
.filter(user -> user.isActive())
.findAny();
}

public Optional<User> getFirstActiveUser(List<User> users){
return users.stream()
.filter(user -> user.isActive())
.findFirst();
}

3.3.2.4 allMatch、anyMatch、noneMatch

及早求值方法:

均以 Predicate 作为输入参数,对集合中的元素进行判断,并返回最终的结果。

1
2
3
4
5
6
// 所有用户是否都已激活
boolean allMatch = users.stream().allMatch(user -> user.isActive());
// 是否有激活用户
boolean anyMatch = users.stream().anyMatch(user -> user.isActive());
// 是否所有用户都没有激活
boolean noneMatch = users.stream().noneMatch(user -> user.isActive());

3.3.2.6. forEach

及早求值:

以 Consumer 为参数,对 Stream 中复合条件的对象进行操作。

1
2
3
4
5
6
public void printActiveName(List<User> users){
users.stream()
.filter(user -> user.isActive())
.map(user -> user.getName())
.forEach(name -> System.out.println(name));
}

3.3.2.7 reduce

及早求值方法:

reduce 操作可以实现从一组值中生成一个值,之前提到的 count、min、max 方法因为比较通用,单独提取成方法,事实上,这些方法都是通过 reduce 完成的。

下图展示的是对 stream 进行求和的过程,以 0 为起点,每一步都将 stream 中的元素累加到 accumulator 中,遍历至最后一个元素,accumulator 就是所有元素值的和。

Stream求和过程

3.3.2.8. filter

惰性求值方法:

以 Predicate 作为参数(相当于 if 语句),对 Stream 中的元素进行过滤,只有复合条件的元素才能进入下面的处理流程。

处理流程如下:

Stream Filter操作

1
2
3
4
5
public List<User> getActiveUser(List<User> users){
return users.stream()
.filter(user -> user.isActive())
.collect(toList());
}

3.3.2.9 map

及早求值方法:
以 Function 作为参数,将 Stream 中的元素从一种类型转换成另外一种类型。

处理过程如下:

Stream Map

1
2
3
4
5
public List<String> getNames(List<User> users){
return users.stream()
.map(user -> user.getName())
.collect(toList());
}

3.3.2.10 peek

Stream 提供的是内迭代,有时候为了功能调试,需要查看每个值,同时能够继续操作流,这时就会用到 peek 方法。

1
2
3
4
5
6
7
public void printActiveName(List<User> users){
users.stream()
.filter(user -> user.isActive())
.peek(user -> System.out.println(user.isActive()))
.map(user -> user.getName())
.forEach(name -> System.out.println(name));
}

3.3.2.11 其他

针对集合 Stream 还提供了许多功能强大的操作,暂不一一列举,简单汇总一下。

  • distinct:进行去重操作
  • sorted:进行排序操作
  • limit:限定结果输出数量
  • skip:跳过 n 个结果,从 n+1 开始输出

3.4 Optional

Java 程序中出现最多的异常就是 NullPointerException,没有之一。Optional 的出现力求改变这一状态。

Optional 对象相当于值的容器,而该值可以通过 get 方法获取,同时 Optional 提供了很多函数用于对值进行操作,从而最大限度的避免 NullPointerException 的出现。

Optional 与 Stream 的用法基本类型,所提供的方法同样分为惰性和及早求值两类,惰性方法主要用于流程组装,及早求值用于最终计算。

3.4.1 of

使用工厂方法 of,可以从一个值中创建一个 Optional 对象,如果值为 null,会报 NullPointerException。

1
2
3
4
5
Optional<String> dataOptional = Optional.of("a");
String data = dataOptional.get(); // data is "a"

Optional<String> dataOptional = Optional.of(null);
String data = dataOptional.get(); // throw NullPointerException

3.4.2 empty

工厂方法 empty,可以创建一个不包含任何值的 Optional 对象。

1
2
Optional<String> dataOptional = Optional.empty();
String data = dataOptional.get(); //throw NoSuchElementException

3.4.3 ofNullable

工厂方法 ofNullable,可将一个空值转化成 Optional。

1
2
3
 public static <T> Optional<T> ofNullable(T value) {
return value == null ? empty() : of(value);
}

3.4.4 get、orElse、orElseGet、orElseThrow

直接求值方法,用于获取 Optional 中值,避免空指针异常的出现。

1
2
3
4
5
Optional<String> dataOptional = Optional.of("a");
dataOptional.get(); // 获取Optional中的值, 不存在会抛出NoSuchElementException
dataOptional.orElse("b"); //获取Optional中的值,不存在,直接返回"B"
dataOptional.orElseGet(()-> String.valueOf(System.currentTimeMillis())); //获取Optional中的值,不存在,对Supplier进行计算,并返回计算结果
dataOptional.orElseThrow(()-> new XXXException()); //获取Optional中的值,不存在,抛出自定义异常

3.4.5 isPresent、ifPresent

直接求值方法,isPresent 用于判断 Optional 中是否有值,ifPresent 接收 Consumer 对象,当 Optional 有值的情况下执行。

1
2
3
4
5
6
7
8
9
10
11
Optional<String> dataOptional = Optional.of("a");
String value = null;
if (dataOptional.isPresent()){
value = dataOptional.get();
}else {
value = "";
}
//等价于
String value2 = dataOptional.orElse("");
// 当Optional中有值的时候执行
dataOptional.ifPresent(v->System.out.println(v));

3.4.6 map

惰性求值方法。map 与 Stream 中的用法基本相同,用于对 Optional 中的值进行映射处理,从而避免了大量 if 语句嵌套,多个 map 组合成链,只需对最终的结果进行操作,中间过程中如果存在 null 值,之后的 map 不会执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Data
static class Order{
private Name owner;
}

@Data
static class User{
private Name name;
}

@Data
static class Name{
String firstName;
String midName;
String lastName;
}
private String getFirstName(Order order){
if (order == null){
return "";
}
if (order.getOwner() == null){
return "";
}
if (order.getOwner().getFirstName() == null){
return "";
}
return order.getOwner().getFirstName();
}
private String getFirstName(Optional<Order> orderOptional){
return orderOptional.map(order -> order.getOwner())
.map(user->user.getFirstName())
.orElse("");
}

3.4.7 filter

惰性求值,对 Optional 中的值进行过滤,如果 Optional 为 empty,直接返回 empty;如果 Optional 中存在值,则对值进行验证,验证通过返回原 Optional,验证不通过返回 empty。

1
2
3
4
5
6
7
 public Optional<T> filter(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
if (!isPresent())
return this;
else
return predicate.test(value) ? this : empty();
}

4. Lambda 下模式的进化

设计模式是人们熟悉的一种设计思路,他是软件架构中解决通用问题的模板,将解决特定问题的最佳实践固定下来,但设计模式本身会比较复杂,包含多个接口、若干个实现类,应用过程相对繁琐,这也是影响其应用的原因之一。

Lambda 表达式大大简化了 Java 中行为传递的问题,对于很多行为式设计模式而言,减少了不少构建成本。

4.1 命令模式

命令者是一个对象,其封装了调用另一个方法的实现细节,命令者模式使用该对象可以编写根据运行时条件,顺序调用方法的一般性代码。

大多数命令模式中的命令对象,其实是一种行为的封装,甚至是对其他对象内部行为的一种适配,这种情况下,Lambda 表达式并有了用武之地。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
interface Command{
void act();
}

interface Editor{
void open();
void write(String data);
void save();
}

class CommandRunner{
private List<Command> commands = new ArrayList<>();

public void run(Command command){
command.act();
this.commands.add(command);
}

public void redo(){
this.commands.forEach(Command::act);
}
}

class OpenCommand implements Command{
private final Editor editor;

OpenCommand(Editor editor) {
this.editor = editor;
}

@Override
public void act() {
this.editor.open();
}
}

class WriteCommand implements Command{
private final Editor editor;
private final String data;
WriteCommand(Editor editor, String data) {
this.editor = editor;
this.data = data;
}

@Override
public void act() {
editor.write(this.data);
}
}

class SaveCommand implements Command{
private final Editor editor;

SaveCommand(Editor editor) {
this.editor = editor;
}

@Override
public void act() {
this.editor.save();
}
}

public void useCommand(){
CommandRunner commandRunner = new CommandRunner();
Editor editor = new EditorImpl();
String data1 = "data1";
String data2 = "data2";
commandRunner.run(new OpenCommand(editor));
commandRunner.run(new WriteCommand(editor, data1));
commandRunner.run(new WriteCommand(editor, data2));
commandRunner.run(new SaveCommand(editor));
}

public void useLambda(){
CommandRunner commandRunner = new CommandRunner();
Editor editor = new EditorImpl();
String data1 = "data1";
String data2 = "data2";
commandRunner.run(()->editor.open());
commandRunner.run(()->editor.write(data1));
commandRunner.run(()->editor.write(data2));
commandRunner.run(()->editor.save());
}



class EditorImpl implements Editor{

@Override
public void open() {

}

@Override
public void write(String data) {

}

@Override
public void save() {

}
}

从代码中可见,Lambda 表达式的应用,减少了创建子类的负担,增加了代码的灵活性。

4.2 策略模式

策略模式能够在运行时改变软件的算法行为,其核心的实现思路是,使用不同的算法来解决同一个问题,然后将这些算法封装在一个统一的接口背后。

可见策略模式也是一种行为行为传递的模式。

策略模式下的压缩算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
interface CompressionStrategy{
OutputStream compress(OutputStream outputStream) throws IOException;
}

class GzipBasedCompressionStrategy implements CompressionStrategy{

@Override
public OutputStream compress(OutputStream outputStream) throws IOException {
return new GZIPOutputStream(outputStream);
}
}

class ZipBasedCompressionStrategy implements CompressionStrategy{

@Override
public OutputStream compress(OutputStream outputStream) throws IOException {
return new ZipOutputStream(outputStream);
}
}

class Compressor{
private final CompressionStrategy compressionStrategy;

Compressor(CompressionStrategy compressionStrategy) {
this.compressionStrategy = compressionStrategy;
}

public void compress(Path inFile, File outFile) throws IOException {
try (OutputStream outputStream = new FileOutputStream(outFile)){
Files.copy(inFile, this.compressionStrategy.compress(outputStream));
}
}
}

{
Compressor gzipCompressor = new Compressor(new GzipBasedCompressionStrategy());
gzipCompressor.compress(in,out);

Compressor ziCompressor = new Compressor(new ZipBasedCompressionStrategy());
ziCompressor.compress(in,out);
}

{
Compressor gzipCompressor = new Compressor(GZIPOutputStream::new);
gzipCompressor.compress(in,out);

Compressor ziCompressor = new Compressor(ZipOutputStream::new);
ziCompressor.compress(in,out);
}

4.3 观察者模式

观察者模式中,被观察者持有观察者的一个列表,当被观察者的状态发送变化时,会通知观察者。

对于一个观察者来说,往往是对一个行为的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
interface NameObserver{
void onNameChange(String oName, String nName);
}

@Data
class User {
private final List<NameObserver> nameObservers = new ArrayList<>();
@Setter(AccessLevel.PRIVATE)
private String name;

public void updateName(String nName){
String oName = getName();
setName(nName);
nameObservers.forEach(nameObserver -> nameObserver.onNameChange(oName, nName));
}

public void addObserver(NameObserver nameObserver){
this.nameObservers.add(nameObserver);
}
}

class LoggerNameObserver implements NameObserver{

@Override
public void onNameChange(String oName, String nName) {
System.out.println(String.format("old Name is %s, new Name is %s", oName, nName));
}
}

class NameChangeNoticeObserver implements NameObserver{

@Override
public void onNameChange(String oName, String nName) {
notic.send(String.format("old Name is %s, new Name is %s", oName, nName));
}
}

{
User user = new User();
user.addObserver(new LoggerNameObserver());
user.addObserver(new NameChangeNoticeObserver());
user.updateName("张三");
}

{
User user = new User();
user.addObserver((oName, nName) ->
System.out.println(String.format("old Name is %s, new Name is %s", oName, nName)));
user.addObserver((oName, nName) ->
notic.send(String.format("old Name is %s, new Name is %s", oName, nName)));
user.updateName("张三");
}

4.4 模板方法模式

模板方法将整体算法设计成一个抽象类,他有一系列的抽象方法,代表方法中可被定制的步骤,同时这个类中包含一些通用代码,算法的每一个变种都由具体的类实现,他们重新抽象方法,提供相应的实现。

模板方法,实际是行为的一种整合,内部大量用到行为的传递。
先看一个标准的模板方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
interface UserChecker{
void check(User user);
}

abstract class AbstractUserChecker implements UserChecker{
@Override
public final void check(User user){
checkName(user);
checkAge(user);
}
abstract void checkName(User user);

abstract void checkAge(User user);
}

class SimpleUserChecker extends AbstractUserChecker {

@Override
void checkName(User user) {
Preconditions.checkArgument(StringUtils.isNotEmpty(user.getName()));
}

@Override
void checkAge(User user) {
Preconditions.checkArgument(user.getAge() != null);
Preconditions.checkArgument(user.getAge().intValue() > 0);
Preconditions.checkArgument(user.getAge().intValue() < 150);
}
}

{
UserChecker userChecker = new SimpleUserChecker();
userChecker.check(new User());
}

class LambdaBaseUserChecker implements UserChecker{
private final List<Consumer<User>> userCheckers = Lists.newArrayList();
public LambdaBaseUserChecker(List<Consumer<User>>userCheckers){
this.userCheckers.addAll(userCheckers);
}

@Override
public void check(User user){
this.userCheckers.forEach(userConsumer -> userConsumer.accept(user));
}
}

{
UserChecker userChecker = new LambdaBaseUserChecker(Arrays.asList(
user -> Preconditions.checkArgument(StringUtils.isNotEmpty(user.getName())),
user -> Preconditions.checkArgument(user.getAge() != null),
user -> Preconditions.checkArgument(user.getAge().intValue() > 0),
user -> Preconditions.checkArgument(user.getAge().intValue() < 150)
));

userChecker.check(new User());
}

@Data
class User{
private String name;
private Integer age;
}

在看一个 Spring JdbcTemplate,如果使用 Lambda 进行简化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public JdbcTemplate jdbcTemplate;

public User getUserById(Integer id){
return jdbcTemplate.query("select id, name, age from tb_user where id = ?", new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setInt(1, id);
}
}, new ResultSetExtractor<User>() {
@Override
public User extractData(ResultSet resultSet) throws SQLException, DataAccessException {
User user = new User();
user.setId(resultSet.getInt("id"));
user.setName(resultSet.getString("name"));
user.setAge(resultSet.getInt("age"));
return user;
}
});
}

public User getUserByIdLambda(Integer id){
return jdbcTemplate.query("select id, name, age from tb_user where id = ?",
preparedStatement -> preparedStatement.setInt(1, id),
resultSet -> {
User user = new User();
user.setId(resultSet.getInt("id"));
user.setName(resultSet.getString("name"));
user.setAge(resultSet.getInt("age"));
return user;
});
}

@Data
class User {
private Integer id;
private String name;
private Integer age;
}

5. Lambda 下并发程序

并发与并行:

  • 并发是两个任务共享时间段,并行是两个任务同一时间发生。
  • 并行化是指为了缩短任务执行的时间,将任务分解为几个部分,然后并行执行,这和顺序执行的工作量是一样的,区别是多个 CPU 一起来干活,花费的时间自然减少了。
  • 数据并行化。数据并行化是指将数据分为块,为每块数据分配独立的处理单元。

并行和并发的区别

5.1 并行化流操作

并行化流操作是 Stream 提供的一个特性,只需改变一个方法调用,就可以让其拥有并行操作的能力。

如果已经存在一个 Stream 对象,调用他的 parallel 方法就能让其并行执行。

如果已经存在一个集合,调用 parallelStream 方法就能获取一个拥有并行执行能力的 Stream。

并行流主要解决如何高效使用多核 CPU 的事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Data
class Account{
private String name;
private boolean active;
private Integer amount;
}

public int getActiveAmount(List<Account> accounts){
return accounts.parallelStream()
.filter(account -> account.isActive())
.mapToInt(account -> account.getAmount())
.sum();
}

public int getActiveAmount2(List<Account> accounts){
return accounts.stream()
.parallel()
.filter(account -> account.isActive())
.mapToInt(Account::getAmount)
.sum();
}

并行流底层使用 fork/join 框架,fork 递归式的分解问题,然后每个段并行执行,最终有 join 合并结果,返回最后的值。

Fork/Join分解合并问题

5.2 阻塞 IO VS 非阻塞 IO

BIO VS NIO

BIO 阻塞式 IO,是一种通用且容易理解的方式,与程序交互时通常都符合这种顺序执行的方式,但其主要的缺陷在于每个 socket 会绑定一个 Thread 进行操作,当长链过多时会消耗大量的 Server 资源,从而导致其扩展性性下降。

NIO 非阻塞 IO,一般指的是 IO 多路复用,可以使用一个线程同时对多个 socket 的读写进行监控,从而使用少量线程服务于大量 Socket。

由于客户端开发的简便性,大多数的驱动都是基于 BIO 实现,包括 MySQL、Redis、Mongo 等;在服务器端,由于其高性能的要求,基本上是 NIO 的天下,以最大限度的提升系统的可扩展性。

由于客户端存在大量的 BIO 操作,我们的客户端线程会不停的被 BIO 阻塞,以等待操作返回值,因此线程的效率会大打折扣。

BIO Client

如上图,线程在 IO 与 CPU 之间不停切换,走走停停,同时线程也没有办法释放,一直等到任务完成。

5.3 Future

构建并发操作的另一种方案便是 Future,Future 是一种凭证,调用方法不是直接返回值,而是返回一个 Future 对象,刚创建的 Future 为一个空对象,由后台线程执行耗时操作,并在结束时将结果写回到 Future 中。

当调用 Future 对象的 get 方法获取值时,会有两个可能,如果后台线程已经运行完成,则直接返回;如果后台线程没有运行完成,则阻塞调用线程,知道后台线程运行完成或超时。

使用 Future 方式,可以以并行的方式运行多个子任务。

当主线程需要调用比较耗时的操作时,可以将其放在辅助线程中执行,并在需要数据的时候从 future 中获取,如果辅助线程已经运行完成,则立即拿到返回的结果,如果辅助线程还没有运行完成,则主线程等待,并在完成时获取结果。

基于Future的并发操作

一种常见的场景是在Controller中从多个Service中获取结果,并将其封装成一个View对象返回给前端用于显示,假设需要从三个接口中获取结果,每个接口的平均响应时间是20ms,那按照串行模式,总耗时为sum(i1, i2, i3) = 60ms;如果按照Future并发模式将加载任务交由辅助线程处理,总耗时为max(i1, i2, i3 ) = 20ms, 大大减少了系统的响应时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
 private ExecutorService executorService = Executors.newFixedThreadPool(20);

private User loadUserByUid(Long uid){
sleep(20);
return new User();
}

private Address loadAddressByUid(Long uid){
sleep(20);
return new Address();

}

private Account loadAccountByUid(Long uid){
sleep(20);
return new Account();
}

/**
* 总耗时 sum(LoadUser, LoadAddress, LoadAccount) = 60ms
* @param uid
* @return
*/
public View getViewByUid1(Long uid){
User user = loadUserByUid(uid);
Address address = loadAddressByUid(uid);
Account account = loadAccountByUid(uid);
View view = new View();
view.setUser(user);
view.setAddress(address);
view.setAccount(account);
return view;
}

/**
* 总耗时 max(LoadUser, LoadAddress, LoadAccount) = 20ms
* @param uid
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public View getViewByUid(Long uid) throws ExecutionException, InterruptedException {
Future<User> userFuture = executorService.submit(()->loadUserByUid(uid));
Future<Address> addressFuture = executorService.submit(()->loadAddressByUid(uid));
Future<Account> accountFuture = executorService.submit(()->loadAccountByUid(uid));
View view = new View();
view.setUser(userFuture.get());
view.setAddress(addressFuture.get());
view.setAccount(accountFuture.get());
return view;
}



private void sleep(long time){
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Data
class View{
private User user;
private Address address;
private Account account;
}

class User{

}

class Address{

}

class Account{

}

Future 方式存在一个问题,及在调用 get 方法时会阻塞主线程,这是资源的极大浪费,我们真正需要的是一种不必调用 get 方法阻塞当前线程,就可以操作 future 对象返回的结果。

上例中只是子任务能够拆分并能并行执行的一种典型案例,在实际开发过程中,我们会遇到更多、更复杂的场景,比如:

  • 将两个 Future 结果合并成一个,同时第二个又依赖于第一个的结果
  • 等待 Future 集合中所有记录的完成
  • 等待 Future 集合中的最快的任务完成
  • 定义任务完成后的操作

对此,我们引入了 CompletableFuture 对象。

5.4 CompletableFuture

  • CompletableFuture 结合了 Future 和回调两种策略,以更好的处理事件驱动任务。
  • CompletableFuture 与Stream 的设计思路一致,通过注册 Lambda 表达式,把高阶函数链接起来,从而定制更复杂的处理流程。

CompletableFuture 提供了一组函数用于定义流程,其中包括:

5.4.1 创建函数

CompletableFuture 提供了一组静态方法用于创建 CompletableFuture 实例:

1
2
3
4
5
6
7
8
9
public static <U> CompletableFuture<U> completedFuture(U value)// :使用已经创建好的值,创建 CompletableFuture 对象。

public static CompletableFuture<Void> runAsync(Runnable runnable)// 基于 Runnable 创建 CompletableFuture 对象,返回值为 Void,及没有返回值

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)// 基于 Runnable 和自定义线程池创建 CompletableFuture 对象,返回值为 Void,及没有返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)// 基于 Supplier 创建 CompletableFuture 对象,返回值为 U

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) // 基于 Supplier 和自定义线程池创建 CompletableFuture 对象,返回值为 U

以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

方法的参数类型都是函数式接口,所以可以使用 Lambda 表达式实现异步任务。

5.4.2 计算结果完成后

当 CompletableFuture 计算完成或者计算过程中抛出异常时进行回调。

1
2
3
4
5
6
7
public CompletableFuture<T> 	whenComplete(BiConsumer<? super T,? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。

exceptionally 针对异常情况进行处理,当原始的 CompletableFuture 抛出异常的时候,就会触发这个 CompletableFuture 的计算。

下面一组方法虽然也返回 CompletableFuture 对象,但是对象的值和原来的 CompletableFuture 计算的值不同。当原先的 CompletableFuture 的值计算完成或者抛出异常的时候,会触发这个 CompletableFuture 对象的计算,结果由 BiFunction 参数计算而得。因此这组方法兼有 whenComplete 和转换的两个功能。

1
2
3
4
5
public <?U> CompletableFuture<?U> handle(BiFunction<? super T,Throwable,? extends U> fn)

public <?U> CompletableFuture<?U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)

public <?U> CompletableFuture<?U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
5.4.3 转化函数

转化函数类似于 Stream 中的惰性求助函数,主要对 CompletableFuture 的中间结果进行流程定制。

1
2
3
4
5
public <U> CompletableFuture<U> 	thenApply(Function<? super T,? extends U> fn)

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

通过函数完成对 CompletableFuture 中的值得转化,Async 在线的线程池中处理,Executor 可以自定义线程池。

5.4.4 纯消费函数

上面的方法当计算完成的时候,会生成新的计算结果 (thenApply, handle),或者返回同样的计算结果 whenComplete,CompletableFuture 还提供了一种处理结果的方法,只对结果执行 Action,而不返回新的计算值,因此计算值为 Void。

1
2
3
4
5
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

其他的参数类型与之前的含义一致,不同的是函数接口 Consumer,这个接口只有输入,没有返回值。

thenAcceptBoth 以及相关方法提供了类似的功能,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的 action,它用来组合另外一个异步的结果。

1
2
3
4
5
6
7
8
9
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<?
extends U> other, BiConsumer<? super T,? super U> action)

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends
U> other, BiConsumer<? super T,? super U> action)

public <U>
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends
U> other, BiConsumer<? super T,? super U> action, Executor executor)

5.4.5. 组合函数

组合函数主要应用于后续计算需要 CompletableFuture 计算结果的场景。

1
2
3
4
5
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

这一组方法接受一个 Function 作为参数,这个 Function 的输入是当前的 CompletableFuture 的计算值,返回结果将是一个新的 CompletableFuture,这个新的 CompletableFuture 会组合原来的 CompletableFuture 和函数返回的 CompletableFuture。因此它的功能类似:

1
A +–> B +—> C

下面的一组方法 thenCombine 用来复合另外一个 CompletionStage 的结果。两个 CompletionStage 是并行执行的,它们之间并没有先后依赖顺序,other 并不会等待先前的 CompletableFuture 执行完毕后再执行,当两个 CompletionStage 全部执行完成后,统一调用 BiFunction 函数,计算最终的结果。

1
2
3
4
5
6
7
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<?
extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<?
extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<?
extends U> other, BiFunction<? super T,? super U,? extends V> fn,
Executor executor)

5.4.6. Either

Either 系列方法不会等两个 CompletableFuture 都计算完成后执行计算,而是当任意一个 CompletableFuture 计算完成的时候就会执行。

1
2
3
4
5
6
7
8
9
10
11
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

5.4.7 辅助方法

辅助方法主要指 allOf 和 anyOf,这两个静态方法用于组合多个 CompletableFuture。

1
2
3
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)// allOf方法是当所有的CompletableFuture都执行完后执行计算。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)// anyOf方法是当任意一个CompletableFuture执行完后就会执行计算。
wenxinzizhu wechat
扫一扫,添加我的微信,一起交流共同成长(备注为技术学习)