Skip to content
killme2008 edited this page Apr 10, 2012 · 15 revisions

#作者

Iteye上的网友netcomm,感谢。

#Meta的client实现分析

由于meta不像activeMQ等产品,它们的broker端承载了非常多的功能,而像meta这样追求性能为目的的消息中间件,则是把broker端的功能弱化,同时加强了client端的某些功能,如当前client的消息offset的存储、从broker中pull消息等。

下面我们从消息pull这样一个client端最重要的功能作为分析的主线来了解meta中client的实现。

下面是client端执行pull消息的处理流程示意图:

meta1

1.通过ZKLoadRebalanceListener的rebalance方法,根据该client所订阅topic的分区数量来初始化对应数量的FetchRequest实例,并把它们放到FetchRequestQueue中(先进先出)。

2.消息抓取管理器初始化fetchRunners线程池,并启动所有线程对FetchRequestQueue进行读请求的操作。

3.当FetchRequestQueue中有请求时,则执行FetchRequestRunner线程中的processRequest方法,进行后续的操作。

4.通过SimpleMessageConsumer(消息消费者基类)的fetch方法从broker端获取某topic的某分区的消息数据byte组。

5.对broker端返回的消息数据byte组进行操作,解析出一条条消息,并对这些消息进行消费,具体代码实现在FetchRequestRunner类的notifyListener方法,它是消息消费的核心方法,后面我们会重点介绍。

6.FetchRequestRunner处理完一次从broker获取消息并消费的过程后,会把FetchRequest实例重新放回FetchRequestQueue中,重复进行下一轮操作。

上面是client端pull消息的主过程,由于meta的client涉及的功能也较多,为了更进一步了解client端的实现细节,我们从下面几个方面做更进一步的分析

##pull模型的轮训时间 很多用户在一看到消息的获取是通过client端主动pull的方式,就感觉和activeMQ等其他消息中间件所采用的broker主动推送消息到client的方式相比较,实时性有所降低。但通过对meta源码的分析,发现它的实时性还是可以保证的,具体实现方法分析如下:

轮训时间的控制在FetchRequestQueue类的take方法中的如下几行代码:

final long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
      final long tl = this.available.awaitNanos(delay);
}

delay值的设置是按照下面规则进行的:

meta2

  • 初始化为0,所以第一次该请求会立即和broker端通讯,以获得消息组。
  • 大部分情况下pull的轮训间隔为0,所以它的实时性还是可以保证的。

#Meta使用zookeeper的细节 meta对zookeeper非常依赖,而且重要的信息同步都是通过它完成的,官方文档对这一块的说明也不多,所以,我们对meta的分析就从这开始,当我们清晰meta中zookeeper的使用细节后,meta的内部实现原理也就基本清晰了。

meta里使用zookeeper最典型的就是类ZKLoadRebalanceListener的 rebalance()方法,该方法使用到了很多zookeeper的znode,下面我们对这个方面做一个详细介绍:

rebalance方法是用来计算某消息消费者具体应该消费哪个meta服务器节点上的哪些分区中的消息(也就是“消费者的负载平衡”)。该方面在下面两种情况下被触发:

  • 监视同一个消费者分组的consumer列表是否有变化;
  • 监视订阅的topic下的broker节点是否有新增或删除;

当上面两种情况发生时,zookeeper的机制会自动触发到rebalance方法,它的具体算法如下:

1.从zookeeper的'/meta/consumers/某消费者组/ids/某消费者id'节点上获取该消费者的所有订阅了的主题;

2.从zookeeper的'/meta/brokers/ids'节点上获取所有的broker列表;

3.从zookeeper的'/meta/consumers/某消费者组/ids/'、'/meta/consumers/某消费者组/ids/某消费者id'这两个节点中获得每个topic下有哪些消费者;

4.从zookeeper的'/meta/brokers/topics/某topic'节点获得对应topic在broker(包括:master和slaves)里有哪些partition;

5.从上面获得的所有这些信息,调用类ZKLoadRebalanceListener中getRelevantTopicMap方法,判断最新的partition列表或consumer列表和当前在用的是否有变化,如果没有变化则再补充做一个动作(因为虽然partition和consumer都没有新增或删除,但可能cluster的结构发生变化了):对集群做一个比较,看是否有机器down或新增;如果没有变化则继续进行后续rebalance操作;

6.如果经过上面的操作确认相关partition列表或consumer列表有变化,则根据'负载均衡策略'获取某个consumer对应的partition列表,然后根据之前老的和最新的partition列表做相关操作,如新分区在zookeeper上的挂载、释放等操作;

上面是client端一个比较重要的功能,也是zookeeper在meta里一个用的较多的地方。

#Meta的HA HA是任何进入生产环境的软件都需要考虑的一个重要因素,meta提供两种HA的方式:同步和异步(推荐使用异步方式)。

官方提供的文档里已经很清晰说明了HA的场景,这里我们再补充一些官方文档中未详细说明的部分:

  • meta采用冷备方式来实现HA,任何主、备的切换都得重启相关broker。
  • 任何时候只有master可以执行写操作;实现代码在BrokerConnectionListener类的syncedUpdateBrokersInfo方法,该方法从zookeeper上同步最新的master列表供producer使用。
  • client可以连接cluster上的任何一台机器包括master和slave,作为它的消息来源,类似数据库的读写分离。
  • 如果master因为某种原因当机,则必须手动停止某台slave并对它进行相关配置操作,并启动它使它成为新的master。当那台坏了的master修复后,它将作为一台新的slave加入集群。
  • 在master上不停机的情况下新增一个topic,这个新增的topic不能自动的同步到slave上,必须通过某种方式把master上的server.ini同步到所有slave上,然后通过人工重启或通过jmx重启slave。

##同步HA的切换过程:

生产环境中有一台master和一个同步slave,slave是不注册到zookeeper上的,当master当机,则所有连接到该broker的生产者和消费者都停止正常工作。然后人工停止slave,并新增samsa_master.properties配置文件,修改其中recoverOffset属性为true。并且修改server.ini中的brokerId为故障master的id,这些修改做完后,重启slave。这样它就成为新的master对外提供服务了。

#Meta的transaction实现

对事务的支持是meta的一个重要特点,目前它支持XA和本地事务,下面我们详细对XA事务进行分析,由于本地事务相对简单,可以参考XA的实现。

##分布式事务(XA) ###分布式事务介绍

分布式事务在分布式应用中是非常重要的,目前分布式事务的实现标准是XA,而在java体系中就是JTA标准。下面是XA的一个示意图:

meta3

这里我们重点说一下两阶段提交协议(2PC),它是XA的核心思想,具体示意图如下:

meta2

具体更多关于XA的细节请参考XA的接口规范。

###meta的实现 meta里事务的实现主要是如下几个类:

  • TransactionalCommandProcessor:事务命令处理器。它主要作用是接收client端发过来的各种请求,如beginTransaction、prepare、commit、rollback、新增消息等。
  • JournalTransactionStore:基于文件方式的事务存储引擎。
  • JournalStore:具体存储事务的文件存储类。

它们3者之间的关系是:TransactionalCommandProcessor接收client端的各种事务请求,然后调用JournalTransactionStore进行事务存储,JournalTransactionStore根据不同的client请求调用JournalStore具体保存事务信息到磁盘文件。

下面通过一个示意图来进一步进行说明:

meta10

client通过调用beginTransaction来新开始一个事务(在meta里就是一个Tx实例),并把它放在JournalTransactionStore类的inflightTransactions队列里,然后client就可以在这个Tx中新增消息,但这些新增的消息是放在JournalStore文件里,并且完整的保存在内存中(由于meta目前没有专门的内存管理机制,当事务数量特别大的时候,这个地方有可能会出现内存溢出)。当client进行2PC中的prepare时,事务从inflightTransactions队列移到preparedTransactions队列,并保存相关信息到JournalStore。当执行commit时,该Tx的所有消息才真正放到MessageStore里供消息消费者读取。当client端发起rollback请求后,Tx被从preparedTransactions队列中删除,并保存相关信息到JournalStore。

下面我们对meta事务实现的几个重要方面做一个详细介绍:

####beginTransaction

当client端的TransactionContext(XAResource的实现)调用start方法,broker接收到请求后,启动一个新事务。 ####新增消息

处理序列图如下:

image

当事务begin后,client端向broker发送多条消息存储的请求,broker收到请求后会调用JournalTransactionStore的addMessage方法。该方法把请求存储在事务日志文件中(JournalStore),同时新建或找到对应的Tx实例,把这些消息存储请求保存在内存中。这里注意一点,在事务没有提交之前,这些消息存储是不会被放到对应topic消息存储文件中去的。 ####prepare的处理过程

处理序列图如下:

img

prepare的处理过程相对简单些,它只是把Tx实例从JournalTransactionStore类的inflightTransactions中移除到preparedTransactions中,同时在事务日志文件存储相关信息。 ####commit的处理过程

处理序列图如下:

img

commit过程相当复杂点。broker收到client端的commit请求,调用JournalTransactionStore的commit方法,从preparedTransactions里找到对应的Tx,把该Tx里的所有请求命令(PutCommand),按照topic和分区分别保存到真正的topic消息存储文件中去,当全部保存完时,就会通过回调类AppendCallback的appendComplete方法记录commit日志到事务日志文件。 ####recover的处理过程

处理序列图如下:

img

recover操作发生在系统重启的时候,主要是为了还原系统上一次停止时候的事务场景,如还原处在prepare阶段的事务,rollback所有本地事务和没有prepare的XA事务。recover的处理细节包括两部分:

  • 在JournalTransactionStore的构造函数中进行JournalStore的recover操作

  • JournalStore的recover主要是完成从事务日志文件中按照最近的checkpoint从日志中读取所有的日志记录,并按照记录的类型APPEND_MSG和TX_OP分别进行还原操作: #####APPEND_MSG类型 这种类型的日志记录就调用JournalTransactionStore的addMessage方法,但是不会往日志文件中重复记录该消息了。 #####TX_OP类型 这种类型的处理相当复杂点。它根据日志记录的类型又细分为下面几种

  • XA_PREPARE:根据TransactionId把对应的Tx实例从JournalTransactionStore类的inflightTransactions中移到preparedTransactions中。

  • XA_COMMIT和LOCAL_COMMIT:根据TransactionId从JournalTransactionStore类的inflightTransactions或preparedTransactions中找到对应的Tx实例。把该Tx内的所有消息请求对比相应topic消息存储文件中消息,如果topic消息存储文件中不存在这些消息则新增,如果存在则通过crc32校验码进行比对。

  • LOCAL_ROLLBACK和XA_ROLLBACK:根据TransactionId把对应的Tx实例从JournalTransactionStore类的inflightTransactions或preparedTransactions中删除。

在TransactionalCommandProcessor的init方法中调用JournalTransactionStore类的recover操作

经过上面的recover操作后,它已经把meta重启前的事务现场在JournalTransactionStore和JournalStore中进行了还原。接下来就是TransactionalCommandProcessor类的事务现场还原,这个过程是把JournalTransactionStore类的preparedTransactions中的所有Tx在TransactionalCommandProcessor中进行还原,该过程相对简单,可参考源码实现。

经过上面这些recover步骤后,meta作为XAResource就可以继续加入XA事务了。

Clone this wiki locally