首页 观察者模式
文章
取消

观察者模式

观察者模式原理和实现

定义

观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-SubscribeDesign Pattern):在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。

一般来说依赖的对象被称为观察者(Observer)

标准实现(模板代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
 * 被观察者的上层接口
 */
public interface Subject {
  void registerObserver(Observer observer);
  void removeObserver(Observer observer);
  void notifyObservers(Message message);
}

/**
 * 观察者的上层接口
 */
public interface Observer {
  void update(Message message);
}

/**
 * 被观察者的实现类
 */
public class ConcreteSubject implements Subject {
  private List<Observer> observers = new ArrayList<Observer>();
  @Override
  public void registerObserver(Observer observer) {
    observers.add(observer);
  }
  @Override
  public void removeObserver(Observer observer) {
    observers.remove(observer);
  }
  @Override
  public void notifyObservers(Message message) {
    for (Observer observer : observers) {
      observer.update(message);
    }
  }
}

/**
 * 观察者 1 号
 */
public class ConcreteObserverOne implements Observer {
  @Override
  public void update(Message message) {
    //TODO: 获取消息通知,执行自己的逻辑...
    System.out.println("ConcreteObserverOne is notified.");
  }
}

/**
 * 观察者 2 号
 */
public class ConcreteObserverTwo implements Observer {
  @Override
  public void update(Message message) {
    //TODO: 获取消息通知,执行自己的逻辑...
    System.out.println("ConcreteObserverTwo is notified.");
  }
}

/**
 * 启动类
 */
public class Demo {
  public static void main(String[] args) {
    ConcreteSubject subject = new ConcreteSubject();
    subject.registerObserver(new ConcreteObserverOne());
    subject.registerObserver(new ConcreteObserverTwo());
    subject.notifyObservers(new Message());
  }
}

截然不同的实现方式

为什么需要观察者模式

观察者是一种行为型设计模式,也就是定义中说的当对象的状态发生改变时,所有依赖的对象都应该接受通知,并作出一定的行为。

以一个 P2P 投资理财系统为例,用户用户注册成功之后,我们会给用户发放投资体验金。代码大致如下:

1
2
3
4
5
6
7
8
9
10
11
public class UserController {
  private UserService userService; // 依赖注入
  private PromotionService promotionService; // 依赖注入
  public Long register(String telephone, String password) {
    //省略输入参数的校验代码
    //省略userService.register()异常的try-catch代码
    long userId = userService.register(telephone, password);
    promotionService.issueNewUserExperienceCash(userId);
    return userId;
  }
}

显然注册接口做了两件事:注册用户和发放体验经,虽然职责并不单一,但如果不需要扩展需求,则当前代码是可以接受的。

相反,如果需求频繁变动,比如,用户注册成功之后,不再发放体验金,而是改为发放优惠券,并且还要给用户发送一封“欢迎注册成功”的站内信。这种情况下,我们就需要频繁地修改register()函数中的代码,违反开闭原则。而且,如果注册成功之后需要执行的后续操作越来越多,那register()函数的逻辑会变得越来越复杂,也就影响到代码的可读性和可维护性。

这时候观察者模式就能被利用起来了,实际上观察者模式把不同的应对行为,如发放体验金、发放优惠券或者发送站内信抽象为了一个独立的观察者,这样就把经常变动的行为从被观察者中解耦出来了。这样新增或者删除观察者都不会影响到被观察者中的逻辑。

同步阻塞的实现方式

即观察者和被观察者代码在同一个线程内执行,如上面的模板代码。被观察者一直阻塞,直到所有的观察者代码都执行完成 之后,才执行后续的代码。

异步非阻塞的实现方式

还以 P2P 投资理财系统为例,如果注册接口是一个调用非常频繁的接口,对性能非常敏感,那么就可以采用异步非阻塞的实现方式:

  1. 让每个观察者的执行函数在新的线程中执行
  2. 以线程池的方式执行
  3. 基于 EventBus 来实现,比如 Google Guava EventBus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 第一种实现方式,频繁创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。
public class RegPromotionObserver implements RegObserver {
  private PromotionService promotionService; // 依赖注入
  @Override
  public void handleRegSuccess(Long userId) {
    Thread thread = new Thread(new Runnable() {
      @Override
      public void run() {
        promotionService.issueNewUserExperienceCash(userId);
      }
    });
    thread.start();
  }
}
// 第二种实现方式,线程池、异步执行逻辑都耦合在了register()函数中,增加了这部分业务代码的维护成本。

public class UserController {
  private UserService userService; // 依赖注入
  private List<RegObserver> regObservers = new ArrayList<>();
  private Executor executor;
  public UserController(Executor executor) {
    this.executor = executor;
  }
  public void setRegObservers(List<RegObserver> observers) {
    regObservers.addAll(observers);
  }
  public Long register(String telephone, String password) {
    //省略输入参数的校验代码
    //省略userService.register()异常的try-catch代码
    long userId = userService.register(telephone, password);
    for (RegObserver observer : regObservers) {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          observer.handleRegSuccess(userId);
        }
      });
    }
    return userId;
  }
}

这两种实现方式下缺点都挺严重,代码也很不优雅,并且如果我们的需求更加极端一点,需要在同步阻塞和异步非阻塞之间灵活切换,那就要不停地修改UserController的代码。除此之外,如果在项目中,不止一个业务模块需要用到异步非阻塞观察者模式,那这样的代码实现也无法做到复用。

这样针对异步非阻塞观察者模式,我们也可以将它抽象成框架来达到这样的效果,比如 Google Guava EventBus,当然你也可以实现自己的 EventBus(但尽量不要重复造轮子)

跨进程的观察者模式

上面描述的同步阻塞和异步非阻塞实现方式都是在同一个进程中进行的,那如何跨进程实现观察者模式呢?

  1. 调用 RPC 接口发送数据
  2. 基于消息队列(如 ActiveMQ)实现

基于消息队列的方式实现下,被观察者和观察者解耦更加彻底,两部分的耦合更小。被观察者完全不感知观察者,同理,观察者也完全不感知被观察者。被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑。

缺点是需要引入一个新的系统(消息队列),增加了维护成本。

本文由作者按照 CC BY 4.0 进行授权

模板模式

状态模式