解耦神器之观察者模式

大家是不是会经常听到有人说写代码要“高内聚、低耦合”,但是你要是问他什么是高内聚低耦合(waht)?为什么要高内聚、低耦合(why)?怎么做才能高内聚、低耦合(how)? 这些问题就不一定都能回答的了!

百度百科:在软件设计中通常用耦合度和内聚度作为衡量模块独立程度的标准。划分模块的一个准则是高内聚低耦合。
从模块粒度来看,高内聚:尽可能类的每个成员方法只完成一件事(最大限度的聚合); 低耦合:减少类内部,一个成员方法调用另一个成员方法。
从类角度来看, 高内聚低耦合:减少类内部,对其他类的调用;
从功能块来看,高内聚低耦合:减少模块之间的交互复杂度(接口数量,参数数据)即横向:类与类之间、模块与模块之间;纵向:层次之间;尽可能,内容内聚,数据耦合。

我本人特别喜欢李连杰演的这一版霍元甲,他年少轻狂,好勇斗狠,勇争津门第一,中年家破人亡,流落他乡悟出了侠的真谛,成为真正的大侠霍元甲。当然我不太希望我的一生这么波荡起伏,因为我没有他那么厉害。但是我特别喜欢剧中的一句台词,就是“功夫是需要时间去磨练出来的,两三年的猫脚功夫,这一拳20年的功夫,你们挡得住么?。作为一位程序员,我们对业务的理解、系统的设计、代码实现这些能力也是需要磨练的,而且也要日复一日的练习,不然你怎么挡得住面试官20年的功夫。😄😄

霍元甲

本文对于什么是高内聚低耦合(waht)?为什么要高内聚、低耦合(why)?按下不表,就先说说一种能够让你的代码高内聚低耦合的方法: 观察者模式,这种设计模式我认为就和策略模式一样的常用,这两种设计模式不仅实现起来简单,而且效果还好!其中的观察者模式简直是降低模块耦合性提高我们代码逼格的神奇啊! 那接下来就先说说观察者模式组成角色:

抽象主题(Subject)

它把所有观察者对象的引用保存到一个容器里,每个主题都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者对象。

具体主题(ConcreteSubject)

将有关状态存入具体观察者对象;在具体主题内部状态改变时,给所有登记过的观察者发出通知。

抽象抽象观察者(Observer)

为所有的具体观察者定义一个接口,在得到主题通知时更新自己。

具体观察者(ConcreteObserver)

实现抽象观察者角色所要求的更新接口,以便使本身的状态与主题状态协调。

Java原生实现

Java从JDK1.0开始就自带了 java.util.Observerjava.util.Observable ,相关接口、类来实现观察者模式,由此可见Java对于观察者模式的重视,也从另一方面说明观察者模式是多么重要。那么接下来我们开始用Java自带的来实现观察者模式的编码。

在这里我们以一个下单的场景来举例描述,用户创建完订单,要发送一个消息通知用户订单创建成功,还要发送一个消息给仓库,检查对应的订单商品。具体代码如下。

1. 继承java.util.Observable定义主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class OrderSubject extends Observable {

/**
* 订单状态发生变化通知其他,
*/
private String status;

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
// 通知其他
this.setChanged();
// 这里是可以携带一些额外的参数
this.notifyObservers();
}
}

2. 实现java.util.Observer接口定义观察者

java.util.Observer 是一个接口,接口中只有一个 update 方法。每当更改被观察对象时,都会调用此方法。 应用程序调用Observable对象的 notifyObservers方法,以将更改通知给所有对象的观察者。在这个例子中通知模块、仓库模块就是扮演观察者的角色,具体的代码实现如下:

  • 通知模块实现java.util.Observer接口
1
2
3
4
5
6
7
8
@Slf4j
public class NoticeObserver implements Observer {
@Override
public void update(Observable o, Object arg) {
OrderSubject orderSubject = (OrderSubject) o;
log.info("通知中心收到了订单的变化[{}],arg = [{}]", orderSubject.getStatus(),arg);
}
}
  • 仓库模块实现java.util.Observer接口
1
2
3
4
5
6
7
8
9
@Slf4j
public class WareHouseObserver implements Observer {
@Override
public void update(Observable observable, Object arg) {

OrderSubject orderSubject = (OrderSubject) observable;
log.info("仓库观察者收到了订单的变化[{}],arg = [{}]", orderSubject.getStatus(), arg);
}
}

3. 代码编写完成,测试一把

具体测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ObserverTest {

@Test
public void test(){
OrderSubject orderSubject = new OrderSubject();

// 通知系统监听者
NoticeObserver noticeObserver = new NoticeObserver();
orderSubject.addObserver(noticeObserver);

// 仓库监听者
WareHouseObserver wareHouseObserver = new WareHouseObserver();
orderSubject.addObserver(wareHouseObserver);
// 订单创建
orderSubject.setStatus("订单创建啦");

// 订单付款
orderSubject.setStatus("订单付款啦");
}
}

订单的状态发生了变化,对应的观察者就接受到了通知。JDK提供的实现很简单,但是缺陷也是很明显。

  • Observable 是一个类,不是一个接口,也不是一个抽象类,不继承该类无法使用权限类型为 protected 的setChanged()clearChanged()方法。

  • Vector<Observer>保存观察者。Vector效率低下,JDK已经不建议使用了。

  • 主题变化通知观察者的顺序无法自定义。通知的顺序是固定为按照设置观察者的时间倒叙通知,无法自定义。

  • 整体代码执行是同步的,不能设置为异步。且观察者的执行出现异常,程序处理不到位会导致事务回滚

既然JDK中实现不够完美,我们就看看有没有其他的实现方式。Google的Guava工具包提供了EventBus可以实现、Spring框架中也提供了相应的支持,由于我们使用Spring框架比较多,今天我们就先来说说Spring框架中是如何实现对观察者模式的支持吧!

Spring中的实现

Spring框架中提供了相当多的组件可以实现观察者模式,文章后面会一一向大家介绍。这里我引入一个新的业务场景,用户注册之后发送短信通知、积分还有优惠券。这种业务场景非常的常见,我展示一下小伙伴们一般是怎么样实现这样的功能的。因为篇幅有限,这里就展示一下伪代码吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MemberService{

@Autowired
private MemberDao memberDao;

@Autowired
private CouponService couponService;

@Autowired
private SmsService smsService;

/**
* 完成用户注册,成功之后发送优惠券、短信通知
* @param memberName 查询条件
* @param phone 分页信息
* @return
*/
public Result normalRegister(String memberName,String phone){
// 1、执行注册逻辑
log.info("[register][执行用户({}) 的注册逻辑]", phone);
// 保存用户
Long memberId = memberDao.saveMember();
// 2、发送优惠券
couponService.sendCoupon(memberId);
// 3、发送短信
smsService.sendSms(phone);

return new Result();
}
}

大家对这部分代码怎么看?

对于用户注册功能来说,保障用户成功注册,这是个业务的核心诉求,其他的都是锦上添花,所以这个地方不应该掺杂其他的业务。用户注册与发送短信、优惠券功能应该分隔开,这样也符合解耦的设计规则。不然后续业务增加需求,需要用户注册完成之后给用户发送积分,你是不是还要在这个 normalRegister方法中增加发送积分的逻辑,那这个岂不是又违背了开闭原则。

一件事做容易,想做好不容易。写程序也是如此,能够把功能编码实现bug又少,其实是很耗费时间和精力的。我常跟同事们开玩笑说“写代码不要糊弄,该写的代码一定要写,不然你最后肯定会被代码糊弄”。

接下来我来介绍一下如何使用Spring的事件发布机制来实现业务解耦的

由于后面的实现方法中用户注册事件类的代码都是类似的只是类名不同,那我就把用户注册事件类的代码写在前面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.springframework.context.ApplicationEvent;

@Slf4j
@Data
public class UserRegisterEvent extends ApplicationEvent {
/**
* 可以自定义些自己需要的属性
* 用户的id
*/
private Long memberId;

/**
* 用户手机号
*/
private String memberPhone;

public UserRegisterEvent(Object source) {
super(source);
}

public UserRegisterEvent(Object source, Long memberId, String memberPhone) {
super(source);
this.memberId = memberId;
this.memberPhone = memberPhone;
}
}
利用继承ApplicationEvent类、实现ApplicationListener接口或者 @EventListener注解来实现

org.springframework.context.ApplicationListener是Spring提供的一个泛型接口。由应用程序事件侦听器实现的接口,从Spring 3.0开始,ApplicationListener可以一般性地声明其感兴趣的事件类型。在Spring ApplicationContext中注册后,将相应地过滤事件,并且仅针对匹配事件对象调用侦听器。观察者可以实现ApplicationListener接口或者使用@EventListener注解,这个注解和ApplicationListener接口有着相同的功能,相比之下使用注解比使用接口更方便一点。

短信Service实现ApplicationListener接口

1
2
3
4
5
6
7
8
9
@Slf4j
@Service
public class SmsService {

@EventListener
public void sendSms(UserRegisterEvent event) {
log.info("[EventListener][给用户({}) 发送短信]", event);
}
}

优惠券Service直接在addCoupon 方法上添加注解 @EventListener

注意addCoupon 方法的入参为 UserRegisterEvent

1
2
3
4
5
6
7
8
9
@Slf4j
@Service
public class CouponService implements ApplicationListener<UserRegisterEvent> {

@Override
public void onApplicationEvent(UserRegisterEvent event) {
log.info("[addCoupon][给用户({}) 发放优惠劵]", event);
}
}

积分Service中的代码与上面的类似就不贴出来了

注册业务完成发布UserRegisterEvent事件

这个地方MemberService要实现org.springframework.beans.factory.Aware.ApplicationEventPublisherAware接口,才有能力使用 ApplicationEventPublisher 发布事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@Service
public class MemberService implements ApplicationEventPublisherAware {

private ApplicationEventPublisher applicationEventPublisher;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

/**
* 最简单的观察者实现
* @param phone 用户手机号
* @return
*/
public Result register(String memberPhone) {
// 1、 ... 执行注册逻辑
log.info("[register][执行用户({}) 的注册逻辑]", memberPhone);

// 2、用户数据保存到数据库
Long memberId = 1000L;

// 3、发布事件通知
applicationEventPublisher.publishEvent(new UserRegisterEvent(this, memberId, memberPhone));

return new Result();
}
}

单元测试

1
2
3
4
5
6
7
8
9
10
11
12
@ContextConfiguration(locations = {"classpath:spring/spring-dao.xml", "classpath:spring/spring-service.xml"})
@RunWith(value = SpringJUnit4ClassRunner.class)
public class ObserverTest {

@Autowired
private MemberService memberService;

@Test
public void register() {
memberService.register("13911111111");
}
}

执行结果

发送短信在发送优惠券之前

代码执行成功,打印出了给用户发送短信、优惠券日志,功能完美实现。但是稍微有点问题,我想调换一下两者的顺序,先执行发送优惠券,再发送短信。这个要求还是蛮常见,那怎么实现呢?

使用SmartApplicationListener或者GenericApplicationListener接口实现指定观察者自定义顺序执行

org.springframework.context.event.SmartApplicationListener是Spring3.0版本提供的接口,org.springframework.context.event.GenericApplicationListener则是Spring4.2版本提供的接口,如果在你的项目中使用不了,那就是说明你项目中使用的Spring版本过低,这个问题自己自行处理。
在查SmartApplicationListener源码时发现有以下注释:

1
2
3
4
5
6
Extended variant of the standard {@link ApplicationListener} interface,
exposing further metadata such as the supported event type.

<p>Users are <bold>strongly advised</bold> to use the {@link GenericApplicationListener}
interface instead as it provides an improved detection of generics-based
event types.

翻译之后大概的意思就是,SmartApplicationListener是标准ApplicationListener接口的扩展变体,公开了进一步的元数据,例如受支持的事件类型。但强烈建议改用GenericApplicationListener接口,因为它提供了对基于泛型的事件类型的改进检测,那我此处就只以GenericApplicationListener为例了。

优惠券服务Service同样实现接口 GenericApplicationListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
@Service
public class SecondVersionCouponService implements GenericApplicationListener {

/**
* 确定此侦听器是否实际上支持给定的事件类型。
* @param eventType
* @return
*/
@Override
public boolean supportsEventType(ResolvableType eventType) {
return SecondVersionUserRegisterEvent.class.equals(eventType.getRawClass());
}


/**
* 确定此侦听器是否实际上支持给定的源类型
* @param sourceType
* @return
*/
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return MemberService.class == sourceType;
}

/**
* 具体的业务逻辑
* @param event
*/
@Override
public void onApplicationEvent(ApplicationEvent event) {
SecondVersionUserRegisterEvent userRegisterEvent = (SecondVersionUserRegisterEvent) event;
Long memberId = userRegisterEvent.getMemberId();
log.info("[Second addCoupon][给用户({}) 发放优惠劵]", memberId);
}

/**
* 设定执行的顺序,数字越小优先级越高就越先被执行
* @return
*/
@Override
public int getOrder() {
return 0;
}
}

短信服务Service同样实现接口 GenericApplicationListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@Service
public class SecondVersionSmsService implements GenericApplicationListener {

@Override
public boolean supportsEventType(ResolvableType eventType) {

return SecondVersionUserRegisterEvent.class == eventType.getRawClass();
}

@Override
public boolean supportsSourceType(Class<?> sourceType) {
return MemberService.class == sourceType;
}

@Override
public void onApplicationEvent(ApplicationEvent event) {
SecondVersionUserRegisterEvent userRegisterEvent = (SecondVersionUserRegisterEvent) event;
String memberPhone = userRegisterEvent.getMemberPhone();
log.info("[Second onApplicationEvent][给用户({}) 发送短信]", memberPhone);

}

@Override
public int getOrder() {
return 10;
}
}

其实使用org.springframework.context.ApplicationListener + org.springframework.core.annotation.Order注解也可以实现控制观察者的执行顺序,而且更简洁!

注册业务完成发布UserRegisterEvent事件

这个地方MemberService要实现org.springframework.beans.factory.Aware.ApplicationEventPublisherAware接口,才有能力使用 ApplicationEventPublisher 发布事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
@Service
public class MemberService implements ApplicationEventPublisherAware {

private ApplicationEventPublisher applicationEventPublisher;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

/**
* 最简单的观察者实现
* @param phone 用户手机号
* @return
*/
@Transactional(rollbackFor = Exception.class)
public Result register(String phone) {
// 1、 ... 执行注册逻辑
log.info("[Second register][执行用户({}) 的注册逻辑]", phone);

// 2、 保存用户到数据库(假设这个是)
BizLocalMessageEntity entity = new BizLocalMessageEntity();
ThreadLocalRandom random = ThreadLocalRandom.current();
entity.setBizModule((byte) 0);
entity.setBizNo("" + random.nextInt(100000));
entity.setBizType((byte) 0);
entity.setSendStatus((byte) 0);
entity.setCreateTime(new Date());
entity.setUpdateTime(new Date());

int affectRow = bizLocalMessageMapper.insert(entity);
Long memberId = entity.getId();
log.info("插入的affectRow= [{}],主键[{}]", affectRow, memberId);

// 3、发布事件通知
applicationEventPublisher.publishEvent(new SecondVersionUserRegisterEvent(this, memberId, phone));

return new Result();
}
}

单元测试

1
2
3
4
5
6
7
8
9
10
11
12
@ContextConfiguration(locations = {"classpath:spring/spring-dao.xml", "classpath:spring/spring-service.xml"})
@RunWith(value = SpringJUnit4ClassRunner.class)
public class ObserverTest {

@Autowired
private MemberService memberService;

@Test
public void register() {
memberService.register("13911111111");
}
}

执行结果

运行截图

修改两个Service中order的数值可以轻松两个观察者的执行顺序,完美的解决了自定义观察者执行顺序的要求。细心的小伙伴会发现,假设在用户注册的方法中存在事务,观察者的执行出现异常,会导致整个事务回滚,明明用户的数据已经保存到数据库,现在却没有了!还是前面那句话,在这个业务场景,用户注册成功是核心诉求,发送短信、优惠券重要性都是比不上的。那要解决这种情况,就需要用户注册成功的事务提交之后,再发布注册成功事件,避免观察者执行结果的影响。

短信服务Service有有异常抛出导致事务回滚

1
2
3
4
5
6
7
8
9
10
@Override
public void onApplicationEvent(ApplicationEvent event) {
SecondVersionUserRegisterEvent userRegisterEvent = (SecondVersionUserRegisterEvent) event;
String memberPhone = userRegisterEvent.getMemberPhone();
if (StringUtils.isNotBlank(memberPhone)) {
throw new RuntimeException("测试事件中发生异常对原有业务的影响,短信发送失败!!!!");
}
log.info("[Second onApplicationEvent][给用户({}) 发送短信]", memberPhone);

}

短信观察者执行发生异常

可能有的小伙伴想,观察者执行异常导致事务回滚,本质上来说这种方式实现的观察者模式,其实还是在一个线程中同步执行的。那我就在观察者中捕获所有的异常或者用户注册成功之后异步发布事件。

  • 捕获所有异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void onApplicationEvent(ApplicationEvent event) {
try {

SecondVersionUserRegisterEvent userRegisterEvent = (SecondVersionUserRegisterEvent) event;
Long memberId = userRegisterEvent.getMemberId();

if (memberId != null) {
throw new RuntimeException("测试事件中发生异常对原有业务的影响,短信发送失败!!!!");
}
log.info("[Second addCoupon][给用户({}) 发放优惠劵]", memberId);
} catch (Exception e) {
log.error("发放优惠劵发生异常", e);
}

}
  • 异步发布事件
1
2
3
4
5
6
7
8
9

// 3、异步发布事件通知,这样的有可能事务还没有提交,则异步线程读取不到数据库中的数据
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
applicationEventPublisher.publishEvent(new UserRegisterEvent(this, memberId, phone));
});
voidCompletableFuture.exceptionally(exception -> {
log.error("发生了异常", exception);
return null;
});

第一种方式可以解决事务回滚的问题,但是呢感觉会有点low!

第二种方式假如观察者中查询会员的信息,可能出现查不到,因为可能事务还没有提交。

使用 @TransactionalEventListener 实现事务提交之后才执行观察者

使用起来很简单,直接在对应的观察者方法上添加该注解即可。@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreeCouponService {

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void addCoupon(ThreeUserRegisterEvent event) {

log.info("测试事件中发生异常对原有业务的影响");
Long memberId = event.getMemberId();
if (memberId != null) {
throw new RuntimeException("测试事件中发生异常对原有业务的影响,优惠券发送失败!!!!");
}
log.info(" Version=3 [addCoupon][给用户({}) 发放优惠劵]", memberId);
}
}

注意一定要在 register() 方法上添加 @Transactional 注解,不然 register() 方法中有抛出异常之后,会有下面的错误提示:

1
2
信息: Closing org.springframework.context.support.GenericApplicationContext@a67c67e: 
startup date [Thu Feb 04 18:06:53 CST 2021]; root of context hierarchy

@Transactional一定要加上不然方法体内有一次抛出会有上的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Transactional(rollbackFor = Exception.class)
public Result register(String phone) {
// 1、 ... 执行注册逻辑
log.info("[Second register][执行用户({}) 的注册逻辑]", phone);

// 2、 保存用户(假设这是)
int affectRow = bizLocalMessageMapper.insert(entity);
Long memberId = entity.getId();
log.info("插入的affectRow= [{}],主键[{}]", affectRow, memberId);
if (memberId != null) {
throw new RuntimeException("故意抛出异常");
}
applicationEventPublisher.publishEvent(new ThreeUserRegisterEvent(this, memberId, phone));
return new Result();
}

其实,实现 register() 事务提交之后,再发布事件还有两种方式:

  • 使用事务模板TransactionTemplate
  1. 使用之前需要在配置文件中配置事务模板
1
2
3
4
5
6
<!-- 配置 transactionTemplate -->
<bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager">
<ref bean="transactionManager"/>
</property>
</bean>
  1. 在代码中使用事务模板提交事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Autowired
private TransactionTemplate transactionTemplate;

public Result register(String phone) {
// 1、 ... 执行注册逻辑
log.info("[线程name={},register][执行用户({}) 的注册逻辑]", Thread.currentThread().getName(), phone);

// 2、 保存用户(假设这是)
Long memberId = transactionTemplate.execute(status -> {
Long result = null;
try {
BizLocalMessageEntity entity = new BizLocalMessageEntity();
ThreadLocalRandom random = ThreadLocalRandom.current();
entity.setBizModule((byte) 0);
entity.setBizNo("" + random.nextInt(200000));
entity.setBizType((byte) 0);
entity.setMsg("消息内容");
entity.setMsgDesc("消息备注");
entity.setCreateTime(new Date());
entity.setUpdateTime(new Date());

int affectRow = bizLocalMessageMapper.insert(entity);
result = entity.getId();
log.info("Version5 插入的affectRow= [{}],主键[{}]", affectRow, result);
if (affectRow > 0) {
//throw new RuntimeException("测试主要业务失败,事件是否会发布执行");
}
} catch (Exception e) {
log.error("业务失败发生异常", e);
// 标记事务回滚
status.setRollbackOnly();
}
return result;
});

log.info("事务执行情况 用户的id= [{}]", memberId);
if (memberId != null) {
// 3、异步发布事件通知
log.info("发布注册成功事件");
applicationEventPublisher.publishEvent(new UserRegisterEvent(this, memberId, phone));
}
return new Result();
}
  • 使用TransactionSynchronizationManager

这个可以直接使用无需配置,重写TransactionSynchronizationAdapter的afterCommit()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

@Transactional(rollbackFor = Exception.class)
public Result register(String phone) {
// 1、 ... 执行注册逻辑
log.info("[Version4 register][执行用户({}) 的注册逻辑]", phone);

// 2、 保存用户(假设这是)
BizLocalMessageEntity entity = new BizLocalMessageEntity();
ThreadLocalRandom random = ThreadLocalRandom.current();
entity.setBizModule((byte) 0);
entity.setBizNo("" + random.nextInt(100000));
entity.setBizType((byte) 0);
entity.setMsg("");
entity.setMsgDesc("");
entity.setCreateTime(new Date());
entity.setUpdateTime(new Date());

Long memberId = entity.getId();
int affectRow = bizLocalMessageMapper.insert(entity);
log.info("Version4 插入的affectRow= [{}],主键[{}]", affectRow, memberId);
if (affectRow > 0) {
//throw new RuntimeException("Version4 测试主要业务失败,事件是否会发布执行");
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
super.afterCommit();
// 3、事务提交之后发布事件通知
applicationEventPublisher.publishEvent(new UserRegisterEvent(this, memberId, phone));
}
});
return new Result();
}

我们把目前的代码执行一下看看情况怎么样

执行日志截图

从截图上看,我们目前已经通过好几种方式解决了观察者执行异常导致事务回滚的问题,但是也可以从截图中看出用户注册逻辑和对应的观察者都是在main线程中执行的,那我们可以让观察者异步执行吗?这样也可以加快代码的执行速度。这个问题当然也是可以解决的,接下来我们来解决这个问题:

  • 直接使用@Async注解

需要注意的是,使用这个注解一定要自定义线程池,不是它默认使用的是 org.springframework.core.task.SimpleAsyncTaskExecutor 。但这个 SimpleAsyncTaskExecutor 不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13

public class SixCouponService {

@Async
@Order(20)
@EventListener
public void addCoupon(SixUserRegisterEvent event) {

log.info("测试事件中发生异常对原有业务的影响", "");
Long memberId = event.getMemberId();
log.info("线程name={},[addCoupon][给用户({}) 发放优惠劵]", Thread.currentThread().getName(), memberId);
}
}
  • 启动异步线程来发布事件

意思就是事务提交之后,判断事务提交成功则新建一个线程然后发布注册成功事件。事务提交成功可以前面提到的事务模板transactionTemplate和事务管理器TransactionSynchronizationManager。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

public Result register(String phone) {
// 1、 ... 执行注册逻辑
log.info("[线程name={},register][执行用户({}) 的注册逻辑]", Thread.currentThread().getName(), phone);

// 2、 保存用户(假设这是)
Long memberId = transactionTemplate.execute(status -> {
Long result = null;
try {
BizLocalMessageEntity entity = new BizLocalMessageEntity();
ThreadLocalRandom random = ThreadLocalRandom.current();
entity.setBizModule((byte) 0);
entity.setBizNo("" + random.nextInt(200000));
entity.setBizType((byte) 0);
entity.setMsg("消息内容");
entity.setMsgDesc("消息备注");
entity.setCreateTime(new Date());
entity.setUpdateTime(new Date());

int affectRow = bizLocalMessageMapper.insert(entity);
result = entity.getId();
log.info("Version5 插入的affectRow= [{}],主键[{}]", affectRow, result);
} catch (Exception e) {
log.error("业务失败发生异常", e);
// 标记事务回滚
status.setRollbackOnly();
}
return result;
});

log.info("事务执行情况 用户的id= [{}]", memberId);
if (memberId != null) {
// 3、异步发布事件通知
log.info("发布注册成功事件");
CompletableFuture.runAsync(()->{
applicationEventPublisher.publishEvent(new SixUserRegisterEvent(this, memberId, phone));
});
}
return new Result();
}

代码修改之后执行,运行截图如下:

运行截图
我们可以发现代码分别main、taskExecutor-1和taskExecutor-2 三个线程执行的,完美的解决了上面提到的问题。

使用观察者模式之前的用户注册流程:

优化后的注册流程

使用观察者模式之后的用户注册流程:

优化后的注册流程
总结

用户数据写入数据表成功,发布注册成功事件,通知相关的短信、优惠券、积分模块。用观察者模式实现之后,注册逻辑和其他的逻辑分离开,即使后续新加一个邮件通知或者去掉一个发送积分的操作,只是修改了对应的代码逻辑,不用回滚测试所有场景。这符合对修改关闭对扩展开放的规则,有效的降低了各个模块之间耦合度,使得代码的可维护性大大提高。有些同学可能会抱怨,自己的项目简单用不到这些看似高大上的设计模式,其实不然,你只是没有认真研究自己的项目,还有就是对自己的写的代码没有太高的要求。如果你看到了本文,而且看到此处,你回想一下你所做的项目,难道没有一个业务场景满足观察者模式条件么?你想想是不是!

  • 作者: Sam
  • 发布时间: 2021-02-04 23:49:32
  • 最后更新: 2021-02-06 17:07:26
  • 文章链接: https://ydstudios.gitee.io/post/5b3d1989.html
  • 版权声明: 本网所有文章除特别声明外, 禁止未经授权转载,违者依法追究相关法律责任!