建模领域事件
让我们看看敏捷项目管理上下文中的一条需求:
允许将每一个待定项提交到冲刺中。只有在待定项位于发布计划中时.才能进行提交。如果待定项已经提交到了另外的冲刺中.必须先将其回收才能进行新的提交。提交待定项时.通知对应的冲刺和相关兴趣方。
在建模领域事件时,我们应该根据限界上下文中的通用语言来命名事件及其属性。如果事件由聚合上的命令操作产生,那么我们通常根据该操作方法的名字来命名领域事件。对于上面的例子,当我们向一个冲刺提交待定项时,我们将发布与之对应的领域事件:
命令方法:BacklogItem#commitTo(Sprint aSprint)
事件输出:BacklogltemCommitted
事件的名字表明了聚合上的命令方法在执行成功之后所发生的事情:“待定项提交完毕。”当然,我们还可以创建更详细的事件名字,比如BacklogltemCommittedToSprint。但是,在Scrum的通用语言中,待定项只能提交到冲刺中。换句话说,待定项是不能提交到发布中的。因此,使用原先的BacklogltemCommitted已经足够了,并且更加简捷。如果你倾向于使用更详细的事件命名,也是可以的,这只是一个选择问题。
在聚合发布事件时,请注意我们应该使事件的名字反映过去发生的事情,即该事件并不是当前发生的,而是先前发生的。
在有了正确的事件名后,我们还需要什么样的事件属性呢?首先,我们需要一个时间戳来表示事件发生的时间。在Java中,可以使用java.util.Date类来表示。
package com.saasovation.agilepm.domain.model.product;
public class BacklogItemCommitted implements DomainEvent {
private Date occurredOn;
...
}
所有的领域事件都将实现DomainEvent接口,该接口定义了一个occurredOn() 方法:
package com.saasovation.agilepm.domain.model;
import java.util.Date;
public interface DomainEvent {
public Date occurredOn();
}
接下来,团队成员还需要考虑其他有意义的属性。考虑一下,是谁导致了领域事件的产生。这通常包括产生该领域事件的聚合和其他参与操作的聚合,也有可能是其他任何类型的数据属性。
分析之后.我们可以得到以下BacklogltemCommitted事件:
package com.saasovation.agilepm.domain.model.product;
public class BacklogItemCommitted implements DomainEvent {
private Date occurredOn;
private BacklogItemId backlogItemId;
private SprintId committedToSprintId;
private TenantId tenantId;
...
}
团队成员认为.Backlogltem和Sprint的身份标识对于 BacklogltemCommitted事件来说是最关键的。Backlogltem是事件的发起方.而Sprint则是事件的参与方。当然.他们还讨论了更多的话题。该需求特别指出.当Backlogltem被提交到Sprint之后,该Sprint应该得到通知。因此,位于同一个限界上下文中的事件订阅方应该及时地通知Sprint,但前提条件是BacklogltemCommitted事件中存在Sprintld.
此外,在一个多租户环境中.记录Tenantld也是有必要的,虽然Tenamld不会作为参数传给命令方法.但是它却是本地和远程限界上下文所必需的。在本地上下文中.我们需要Tenantld来查询Backlogltem和Sprint。同样,在远程上下文中,我们需要Tenantld来查出领域事件的作用对象。
我们如何建模由事件提供的行为操作呢?通常来说,这是非常简单的,因为领域事件通常都被设计成不变的。事件所携带的属性能够反映出该事件的来源。多数事件的构造函数都只允许全状态初始化,同时,事件对象还提供了访问不同属性的getter方法。
基于此.ProjectOvation团队做了以下实现:
package com.saasovation.agilepm.domain.model.product;
public class BacklogItemCommitted implements DomainEvent {
...
public BacklogItemCommitted(
TenantId aTenantId,
BacklogItemId aBacklogItemId,
SprintId aCommittedToSprintId) {
super();
this.setOccurredOn(new Date());
this.setBacklogItemId(aBacklogItemId);
this.setCommittedToSprintId(aCommittedToSprintId);
this.setTenantId(aTenantId);
}
@Override
public Date occurredOn() {
return this.occurredOn;
}
public BacklogItemId backlogItemId() {
return this.backlogItemId;
}
public SprintId committedToSprintId() {
return this.committedToSprintId;
}
public TenantId tenantId() {
return this.tenant;
}
...
}
在该事件发布时.本地上下文的订阅方可以用该事件来通知相应的Sprint:
MessageConsumer.instance(messageSource, false)
.receiveOnly(
new String[] { "BacklogItemCommitted" },
new MessageListener(Type.TEXT) {
@Override
public void handleMessage(
String aType,
String aMessageId,
Date aTimestamp,
String aTextMessage,
long aDeliveryTag,
boolean isRedelivery)
throws Exception {
// first de-duplicate message by aMessageId
// 第一条消重之后的消息,以aMeesageld标定
...
// get tenantId, sprintId, and backlogItemId from JSON
...
Sprint sprint =
sprintRepository.sprintOfId(tenantId, sprintId);
BacklogItem backlogItem =
backlogItemRepository.backlogItemOfId(
tenantId,
backlogItemId);
sprint.commit(backlogItem);
}
});
根据系统需求,在处理了BacklogltemCommitted消息之后,Sprint与刚才所提交的Backlogltem达到了最终一致性。我们将在本章后续内容中讨论订阅方是如何接收领域事件的。
团队成员意识到.这种方式还存在一个小问题。Sprint如何处理更新事务呢?我们可以让消息处理器来处理事务。但是.无论如何我们都需要相应地重构代码。最好的方式是将事务处理委派给应用服务(14).这是一种很自然的选择.同时这种方式能够很好地融入六边形架构(4)中。如此一来,代码将变成:
MessageConsumer.instance(messageSource, false)
.receiveOnly(
new String[] { "BacklogItemCommitted" },
new MessageListener(Type.TEXT) {
@Override
public void handleMessage(
String aType,
String aMessageId,
Date aTimestamp,
String aTextMessage,
long aDeliveryTag,
boolean isRedelivery)
throws Exception {
// get tenantId, sprintId, and backlogItemId from JSON
String tenantId = ...
String sprintId = ...
String backlogItemId = ...
ApplicationServiceRegistry
.sprintService()
.commitBacklogItem(
tenantId, sprintId, backlogItemId);
}
});
在上面的例子中,我们没有必要消除对事件的重复提交,因为向Sprint提交 Backlogltem是一个幂等操作。如果某个Backlogltem已经提交给了Sprint,当再次提交时,Sprint将予以忽略。
除了事件的来源信息,如果订阅方还需要进行更多的操作,那么我们可以向事件中添加额外的状态和行为。这样,订阅方便不用回头再对聚合进行查询,而只需要对所接收到的事件进行查询即可。富有行为和状态的领域事件在事件源中更加常见,因为那些需要持久化并进而发布到外部限界上下文的领域事件需要更多的额外状态,请参考附录A。
白板时间
- 列出你领域中已经存在但是还未被捕获的领域事件。
- 想想如何将这些事件显现在自己的领域模型中。
最容易识别出的便是当一个聚合依赖于另外一个聚合的时候,此时我们需要保证它们之间的最终一致性。
正如在值对象(6)中所讨论的,我们需要确保这些额外的事件行为是无副作用的,这样可以保证对象的不变性。
创建具有聚合特征的领域事件
有时,领域事件并不由聚合中的命令方法产生,而是直接由客户方所发出的请求产生。此时,领域事件可以建模成一个聚合,并且可以拥有自己的资源库。但是,又由于领域事件表示的是发生在过去的事情,因此资源库是不能对事件进行删除的。
和聚合一样,由这种方式所创建的事件应该成为模型结构的一部分。因此,它们不再仅仅表示过去发生的事情。
此时的领域事件依然应该设计成不变的,但是它们将拥有唯一标识。对于领域事件而言,我们可以使用事件属性来表示唯一标识。然而,即便事件的唯一标识可以由一组属性来决定,最好的方式还是采用生成的唯一标识,请参考实体(5)。这样,如果设计有变化,我们依然可以保证事件的唯一性。
由这种方式所创建的事件可以通过消息设施进行分发,同时又可以将其添加到资源库中。客户方可以通过调用领域服务(7)来创建事件,然后将其添加到资源库中,再通过消息设施进行发布。在这种情况下,资源库和消息设施必须使用相同的持久化实例(数据源),或者使用全局事务(即XA和两阶段提交),以此来保证对事件的成功提交。
在消息设施成功存储事件之后,它将异步地将事件发送给消息队列监听器、话题订阅方或者Actor Model1中的Actor等。如果消息设施所使用的存储和模型所使用的存储是分离的,并且消息设施不支持全局事务,那么在调用领域服务时,事件必须已经存在于消息存储中。消息转发组件将对消息存储中的每一个事件进行处理,然后通过消息设施将事件发布出去。对此,我们将在本章后续内容做详细讨论。
1. 请参考Erlang和Scala的Actor Model。在使用Scala或Java时,可以特别关注一下Akka。 ↩
身份标识
这里,我们再讨论一下领域事件为什么需要唯一标识。有时,我们需要对不同的事件进行区分。在创建、发布事件的限界上下文中,我们几乎没有理由对不同事件进行比较。但是,如果我们的确需要对不同的事件进行比较,我们应该怎么办呢?再者,如果此时的事件被设计成了聚合,我们又该怎么办呢?
对于领域事件来说,使用属性来表示唯一标识似乎已经足够了,就像值对象一样。使用事件的名字、产生事件的聚合标识和事件时间戳已经足以对不同的事件进行区分了。
当领域事件被建模成了聚合;或者我们需要对不同的事件进行比较,但是事件的属性又不足以区分事件时,我们便需要为事件创建唯一标识。当然,还有其他的原因。
当我们需要将领域事件发布到外部限界上下文中时,为事件创建唯一标识也是有必要的。在有些情况下,单条消息可能会被多次分发,比如,在消息设施确定消息发出之前,消息发布器便瘫痪了。
不管是什么原因导致了对消息的重新分发,消息订阅方都需要检查出重复的消息,并且将其忽略掉。为了达到这样的目的,有些消息设施在消息头中加人了唯一性的消息标识,此时我们自己的领域模型是不能生成这样的标识的。即便消息设施不会自动地向消息中加人唯一标识,消息的发送方也会向事件本身或者消息中加人这样的标识信息。不管采用哪种方法,远程的订阅方都有机会知道一条消息是否是重复发送的。
有必要为领域事件提供equals()和hashCode()方法吗?有,但是通常来说,只有当事件用于本地限界上下文中时,我们才这么做。对于通过消息设施发送的事件,有时订阅方接收的并不是事件对象本身,而是以XML、JSON或键值对等表示的事件数据。另一方面,当一个事件被设计成聚合并且保存在资源库中时,那么事件应该为这些数据展现形式提供相应的方法支持。