MySQL ClusterをOpenMQ ClusterのHAストアに使用する

こんにちは。今回はオープンソースのメッセージキューであるOpen Message Queue(OpenMQ)のHAストアにMySQL Clusterを使う方法について書いてみたいと思います。(Connector/J設定の参考にもなるかと思います)

OpenMQはオープンソースアプリケーションサーバGlassFishのサブプロジェクトとして進められているMQを開発するプロジェクトです。エンタープライズシステム・コンポーネントであるOpenESBや、GlassFish Enterprise Serverなどのコアモジュールとして使用されている大変重要なソフトウエアとなっています。疎結合化システムを構築するためには無くてはならない存在で、システムによっては非常に高い可用性を求められる部分です。Myはうす(まい・はうす改め)ではOpenMQについての色々なトピックを書いて来ましたがいよいよ本命の登場です。

OpenMQのHAクラスタについてはこちらで説明していますのでよろしければご覧ください。

以下のトピックでは、OpenMQのクラスタでSunのHADBを使用する手順を書いています。
HADBを使用したHAクラスタ構成その1
HADBを使用したHAクラスタ構成その2

[4/17日Update]
奇しくも、4/16日にOpenMQクラスタについてのWebcastがリリースされました。brokerとproducer,consumer,永続ストレージとメッセージデリバリの関係がすごくわかりやすいです。Cluster構成にした場合の動きなどが特に参考になりますね(動画です!!)http://www.sun.com/offers/details/message_queue_ha_clusters.html

MySQL Clusterを使用した設定を行う場合も、HADBの時と流れは一緒ですので参考にしてください。
アーキテクチャ図を一枚引用します。

MQ4.1HAクラスタ・アーキテクチャとなっていますが、基本的にMQ4.3HAクラスタは4.1のものと同一のアーキテクチャとなっています。複数のMQのブローカがクラスタを構成し、同一のシェアドHAストアを利用します。少しおさらいしてみたいと思います。MySQL Clusterとの設定方法だけを見たい方は飛ばしてください。

[ホームブローカ]

MQのクライアント(JMSクライアントなど)が最初に通信を行ったブローカのことを「ホームブローカ」と呼びます。JMSクライアントのロードバランス機能などにより各クライアントのホームブローカはバラバラになります。OpenMQのHAクラスタでは従来の「コンベンショナルクラスタ」と同様に、どのブローカからもメッセージを取り出すことができる仕組みとなっています。(コンベンショナルでは、永続化メッセージは各ブローカがローカルに持っています)

例えばbroker1に対してsendしたメッセージはbroker2からも同様にreceiveすることが可能です。ただしOpenMQのHAクラスタでは単純に同一のデータ・セッション・コネクションを全brokerが共有する形にはなっていません。ホームブローカでないブローカ経由でメッセージがリクエストされた場合、ブローカがHAストアから他のブローカが管理しているメッセージを探して、クライアントに返す形となります。
#JMSのBrowserでキューの中を覗いた場合ホームブローカ以外にはメッセージがないように見えます。これは覗いた時点ではメッセージがそのブローカに所属していないためです。

[ホームブローカがダウンした場合]

ホームブローカがダウンした場合、上記のような構造となっているのでそのままでは他のブローカからのメッセージリクエストに応えられません。broker間では事前定義された間隔でお互いにハートビートを確認しています。ハートビートチェックがタイムアウトしそのブローカがダウンしていると判断された場合には、ブローカが管理している資源(セッション、トランザクション、メッセージ)のフェイルオーバが発生します。ダウンしたブローカが持っていた資源をHAストアから引継ぎ先にテイクオーバします。ブローカリストを持ったクライアント内でブローカへの接続も自動でフェイルオーバしますので、クライアントはMQのダウンを関知する必要がありません。

[全体としてHAにするための考慮点]

HAクラスタ・アーキテクチャ図の黄色枠内については、各モジュールがHA化されている必要があります。さもなければそこが単一障害点(Single Point of Failure)となってしまいます。冗長化するポイントは以下です。

・OpenMQブローカインスタンス
・HAストア

ブローカインスタンスについては、ブローカインスタンスを複数並行に並べることで解決可能です。ブローカインスタンスを複数並べ、全体として1つの仮想インスタンスとして取り扱うことをMQクラスタと呼んでいます。HAストアについてはMySQLを使用する場合、単体のインスタンスではMySQLサーバプロセスがダウンした場合そこが単一障害点になります。今回はMySQL Clusterを使用します。(単体のMySQLインスタンスも使用可能です)

[HAストアに使用するMySQL Clusterの考慮点]

・ 複数のデータノードを起動
・ 複数のSQLノードを起動
・ 各ブローカからSQLノードに分散/フェイルオーバするように設定

今回MySQL Clusterは説明のため最小構成で設定を行います。(本番適用時には要件に応じて適宜構成設計を行ってください)今回使用する環境のndb_mgmでの構成リストを以下に載せます。
使用するOpenMQのバージョンはここからダウンロードできる4.3を使用しました。

-- NDB Cluster -- Management Client --
ndb_mgm> show
Connected to Management Server at: 192.168.70.128:1186
Cluster Configuration
---------------------
[ndbd(NDB)] 2 node(s)
id=2 @192.168.70.128 (mysql-5.1.30 ndb-6.3.20, Nodegroup: 0, Master)
id=3 @192.168.70.128 (mysql-5.1.30 ndb-6.3.20, Nodegroup: 0)

[ndb_mgmd(MGM)] 1 node(s)
id=1 @192.168.70.128 (mysql-5.1.30 ndb-6.3.20)

[mysqld(API)] 2 node(s)
id=4 @192.168.70.128 (mysql-5.1.30 ndb-6.3.20)
id=5 @192.168.70.128 (mysql-5.1.30 ndb-6.3.20)

[OpenMQ Cluster with MySQL Cluster設定方法]

1.MySQL Clusterを起動し、mysqlクライアントでSQLノードのどちらかに接続してopenmqデータベースを作成します。

Server version: 5.1.30-ndb-6.3.20-cluster-gpl MySQL Cluster Server (GPL)

Type 'help;' or '\h' for help. Type '\c' to clear the buffer.

mysql> create database openmq character set utf8;
Query OK, 1 row affected (0.23 sec)

MQで大きなサイズのメッセージを扱う予定の場合、MySQLの以下のパラメータを設定しておきます。最大サイズによって値を調整してください。

config.ini
[tcp default]
SendBufferMemory=2M
ReceiveBufferMemory=1M

この値を設定しておかないと、JMSクライアントの仕様によってはNDB-APIエラー(SEND BUFFERS OVERLOADED IN NDB KERNEL)となり送信できないことがあります(ストリームバッファを多めに取っている場合など)。デフォルト値はそれぞれ256k,64kです。

my.cnf
[mysqld]
max_allowed_packet = 16M

なお、何も指定していないとデフォルト1Mとなります(最大指定可能は1GB,それ以上の数字を設定しても1GBになる)。それ以上大きなメッセージを保存しようとすると以下のようなエラーとなってしまいます。これより大きなサイズのメッセージを扱いたい場合はByteMessageを使って複数のメッセージに分割する方法もあります。

[15/4/2009:23:01:46 JST] エラー [B2011]: 16-192.168.11.9(f8:22:94:e5:d3:14)-3584-1239778905890 からの JMS メッセージの保存に失敗しました:
com.sun.messaging.jmq.jmsserver.util.BrokerException: [B4004]: メッセージ 16-192.168.11.9(f8:22:94:e5:d3:14)-3584-1239778905890 を維持できませんでした
at com.sun.messaging.jmq.jmsserver.persist.jdbc.MessageDAOImpl.insert(MessageDAOImpl.java:416)
at com.sun.messaging.jmq.jmsserver.persist.jdbc.MessageDAOImpl.insert(MessageDAOImpl.java:340)
at com.sun.messaging.jmq.jmsserver.persist.jdbc.JDBCStore.storeMessage(JDBCStore.java:239)
at com.sun.messaging.jmq.jmsserver.persist.jdbc.JDBCStore.storeMessage(JDBCStore.java:201)
at com.sun.messaging.jmq.jmsserver.core.PacketReference.store(PacketReference.java:1177)
at com.sun.messaging.jmq.jmsserver.core.Queue.routeNewMessage(Queue.java:542)
at com.sun.messaging.jmq.jmsserver.data.handlers.DataHandler.routeMessage(DataHandler.java:462)
at com.sun.messaging.jmq.jmsserver.data.handlers.DataHandler.handle(DataHandler.java:244)
at com.sun.messaging.jmq.jmsserver.data.handlers.DataHandler.handle(DataHandler.java:97)
at com.sun.messaging.jmq.jmsserver.data.PacketRouter.handleMessage(PacketRouter.java:181)
at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.readData(IMQIPConnection.java:1489)
at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.process(IMQIPConnection.java:644)
at com.sun.messaging.jmq.jmsserver.service.imq.OperationRunnable.process(OperationRunnable.java:170)
at com.sun.messaging.jmq.jmsserver.util.pool.BasicRunnable.run(BasicRunnable.java:493)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.sql.SQLException: [INSERT INTO MQMSG41Cmqhamysqlcluster ( ID, MESSAGE_SIZE, STORE_SESSION_ID, DESTINATION_ID, TRANSACTION_ID, CREATED_TS, MESSAGE) VALUES ( ?, ?, ?, ?, ?, ?, ? )]: Packet for query is too large (2713149 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.
at com.sun.messaging.jmq.jmsserver.persist.jdbc.DBManager.wrapSQLException(DBManager.java:863)
at com.sun.messaging.jmq.jmsserver.persist.jdbc.MessageDAOImpl.insert(MessageDAOImpl.java:411)
... 14 more



また、OpenMQ自体にも取り扱えるメッセージの最大バイト数の設定がありますのでそれらの値も調整します。-1は無制限を意味します。OpenMQ側で制限をかけたい場合はこの値を正の数値に設定します。maxBytesPerMsgとmaxTotalMsgBytesのデフォルトは10MB、imq.message.max_sizeは70MBとなっています。

imq.autocreate.destination.maxBytesPerMsg=-1
imq.autocreate.destination.maxTotalMsgBytes=-1
imq.message.max_size=-1

※autocreate宛先の場合、一度作成されてしまうと上の値の設定を行っても反映しないので注意してください。その場合は一度宛先を再作成すると新しい値が反映します。

2.MySQL Connector/J(MySQL用JDBCドライバ)をここからダウンロードします。ダウンロードしたzipファイルを解凍すると、mysql-connector-java-5.0.8-bin.jarという名前のJARが出てきますので、これをMQ_HOME\lib\extフォルダにコピーしておきます。
※OpenMQではConnector/J 5.1ではなく、Connector/J 5.0を使用します。5.1をHAストアとして利用すると以下のようになってしまい、ブローカを起動することが出来ません。

com.sun.messaging.jmq.jmsserver.util.BrokerException: [B4221]: ストアからすべての HA クラスタ情報を読み込めませんでした
at com.sun.messaging.jmq.jmsserver.persist.jdbc.BrokerDAOImpl.getAllBrokerInfos(BrokerDAOImpl.java:1077)
at com.sun.messaging.jmq.jmsserver.persist.jdbc.JDBCStore.getAllBrokerInfos(JDBCStore.java:2764)
at com.sun.messaging.jmq.jmsserver.persist.Store.getAllBrokerInfos(Store.java:1212)
at com.sun.messaging.jmq.jmsserver.cluster.ha.HAClusterManagerImpl$HAMap.updateHAMapForState(HAClusterManagerImpl.java:1498)
at com.sun.messaging.jmq.jmsserver.cluster.ha.HAClusterManagerImpl$HAMap.updateHAMap(HAClusterManagerImpl.java:1485)
at com.sun.messaging.jmq.jmsserver.cluster.ha.HAClusterManagerImpl.getKnownBrokers(HAClusterManagerImpl.java:486)
at com.sun.messaging.jmq.jmsserver.cluster.ha.HAClusterManagerImpl.getConfigBrokers(HAClusterManagerImpl.java:511)
at com.sun.messaging.jmq.jmsserver.service.HAMonitorService.<init>(HAMonitorService.java:437)
at com.sun.messaging.jmq.jmsserver.Broker._start(Broker.java:869)


これは以下のConnector/Jの問題のためです。(これが解決すれば利用可能だと思います)
http://bugs.mysql.com/bug.php?id=41484

3.broker1とbroker2を以下のコマンドでスタートし、初期設定ファイルを生成させます。

imqbrokerd -name broker1 -port 7676

imqbrokerd -name broker2 -port 7677 

4. MQ_VARHOME/instances/以下のbroker1ディレクトリに、broker2ディレクトリが生成されています。その直下にpropsというディレクトリがありそこにインスタンス用コンフィグファイルが入っています。
これを以下のように編集します。

broker1のconfig.properties

imq.instanceconfig.version=300
imq.brokerid=broker1
imq.cluster.ha=true
imq.cluster.clusterid=mqhamysqlcluster
imq.persist.store=jdbc
imq.persist.jdbc.dbVendor=mysql
imq.persist.jdbc.mysql.property.url=jdbc:mysql://192.168.70.128:3308,192.168.70.128:3307/openmq
imq.persist.jdbc.mysql.user=root
imq.persist.jdbc.mysql.needpassword=false
imq.persist.jdbc.mysql.property.autoReconnect=true
imq.persist.jdbc.mysql.property.failOverReadOnly=false
imq.persist.jdbc.mysql.property.autoReconnectForPools=true
imq.persist.jdbc.mysql.property.maxReconnects=1
imq.persist.jdbc.mysql.property.roundRobinLoadBalance=true
imq.persist.jdbc.mysql.tableoption=ENGINE=NDBCLUSTER
imq.persist.jdbc.connection.limit=15
imq.autocreate.destination.maxBytesPerMsg=-1
imq.autocreate.destination.maxTotalMsgBytes=-1
imq.message.max_size=-1

broker2のconfig.properties

imq.instanceconfig.version=300
imq.brokerid=broker2
imq.cluster.ha=true
imq.cluster.clusterid=mqhamysqlcluster
imq.persist.store=jdbc
imq.persist.jdbc.dbVendor=mysql
imq.persist.jdbc.mysql.property.url=jdbc:mysql://192.168.70.128:3307,192.168.70.128:3308/openmq
imq.persist.jdbc.mysql.user=root
imq.persist.jdbc.mysql.needpassword=false
imq.persist.jdbc.mysql.property.autoReconnect=true
imq.persist.jdbc.mysql.property.failOverReadOnly=false
imq.persist.jdbc.mysql.property.autoReconnectForPools=true
imq.persist.jdbc.mysql.property.maxReconnects=1
imq.persist.jdbc.mysql.property.roundRobinLoadBalance=true
imq.persist.jdbc.mysql.tableoption=ENGINE=NDBCLUSTER
imq.persist.jdbc.connection.limit=15
imq.autocreate.destination.maxBytesPerMsg=-1
imq.autocreate.destination.maxTotalMsgBytes=-1
imq.message.max_size=-1

重要なプロパティについて補足説明したいと思います。

imq.brokerid=broker2
ブローカごとにユニークにする必要があります。
ブローカ名と同じに設定すると良いでしょう。

imq.cluster.ha=true
imq.cluster.clusterid=mqhamysqlcluster
OpenMQをHAクラスタで稼動させるために必須となります。同じクラスタ内に入るブローカは全て同じ値を指定する必要があります。

imq.persist.jdbc.mysql.property.url (Connector/J属性:url)
urlでは接続対象のホスト:ポート番号をカンマ区切りで書きます。
Connector/Jの仕様ではurlに?prop=value&prop=valueという書式が可能ですが、ブローカ設定では使用できないので注意してください(無視されます)。その代わりに、imq.persist.jdbc.mysql.property.+プロパティ名=値と書けばその値を反映することが出来ます。Connector/JをAPサーバなどから使う場合はプロパティ名=値のみを設定すればOKです。

imq.persist.jdbc.mysql.property.autoReconnect=true (Connector/J属性:autoReconnect)
imq.persist.jdbc.mysql.property.failOverReadOnly=false (Connector/J属性:failOverReadOnly)
フェイルオーバのための設定です。特にfailOverReadOnly=falseを忘れるとフェイルオーバ後のMySQLサーバに対して一切更新SQLが使えなくなりますので注意が必要です。

imq.persist.jdbc.mysql.property.autoReconnectForPools=true (Connector/J属性:autoReconnectForPools)
接続プールを使用している場合に追加すべきプロパティです。これがtrueとなっているとプールから取得した接続を使用しSQLを実行する時に、その接続が前回SQLクエリ実行に失敗している場合接続テスト(内部PING)が行われます。pingが返ってこない場合、接続が内部的に更新(リコネクト)されます。

imq.persist.jdbc.mysql.property.maxReconnects=1 (Connector/J属性:maxReconnects)
ホストリストの次のホストにFailOverする前に試行する再接続回数です。

imq.persist.jdbc.mysql.property.roundRobinLoadBalance=true (Connector/J属性:roundRobinLoadBalance)
ホストリストに対してラウンドロビン方式でアクセスを行います。

imq.persist.jdbc.mysql.tableoption=ENGINE=NDBCLUSTER
OpenMQのHAストアテーブルをNDBCLUSTERストレージエンジンでCREATEします。
MySQL ClusterをHAストアに設定する場合にMUSTな定義です。

imq.persist.jdbc.connection.limit=15
OpenMQでは、MySQLにアクセスを行うために内部的に次のクラスを使用しています。
・com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource
ブローカ起動時にここで設定した数の接続が作成され、接続プールに保持されます。
デフォルトは5です。mysqlクライアントのshow processlistコマンドでブローカからいくつの接続が張られているかを確認可能です。

mysql> show processlist;
+-----+-------------+-------------------+--------+---------+------+-----------------------------------+------------------+
| Id | User | Host | db | Command | Time | State | Info |
+-----+-------------+-------------------+--------+---------+------+-----------------------------------+------------------+
| 1 | system user | | | Daemon | 0 | Waiting for event from ndbcluster | NULL |
| 243 | root | localhost | NULL | Query | 0 | NULL | show processlist |
| 412 | root | 192.168.70.1:3255 | openmq | Sleep | 54 | | NULL |
| 423 | root | 192.168.70.1:3312 | openmq | Sleep | 64 | | NULL |
| 424 | root | 192.168.70.1:3316 | openmq | Sleep | 59 | | NULL |
| 425 | root | 192.168.70.1:3320 | openmq | Sleep | 54 | | NULL |
| 426 | root | 192.168.70.1:3324 | openmq | Sleep | 48 | | NULL |
| 427 | root | 192.168.70.1:3328 | openmq | Sleep | 43 | | NULL |
| 428 | root | 192.168.70.1:3334 | openmq | Sleep | 38 | | NULL |
| 429 | root | 192.168.70.1:3339 | openmq | Sleep | 32 | | NULL |
| 430 | root | 192.168.70.1:3343 | openmq | Sleep | 27 | | NULL |
| 431 | root | 192.168.70.1:3348 | openmq | Sleep | 22 | | NULL |
| 432 | root | 192.168.70.1:3352 | openmq | Sleep | 16 | | NULL |
| 433 | root | 192.168.70.1:3361 | openmq | Sleep | 11 | | NULL |
| 434 | root | 192.168.70.1:3366 | openmq | Sleep | 6 | | NULL |
| 435 | root | 192.168.70.1:3374 | openmq | Sleep | 0 | | NULL |
+-----+-------------+-------------------+--------+---------+------+-----------------------------------+------------------+
limit - 1の数が用意されます。

5. broker1とbroker2を停止します。初期ユーザIDとパスワードはadmin/adminとなります。

C:\OpenMQ\mq\bin>imqcmd shutdown bkr -b localhost:7676
ユーザ名: admin
パスワード: admin
指定されたブローカの停止中:

---------------------
ホスト プライマリポート
---------------------
localhost 7676

このブローカを停止してもよろしいですか ? (y/n)[n] y

localhost:7676 でのブローカの停止を待機しています...
ブローカの停止に成功しました。


C:\OpenMQ\mq\bin>imqcmd shutdown bkr -b localhost:7677
ユーザ名: admin
パスワード: admin
指定されたブローカの停止中:

---------------------
ホスト プライマリポート
---------------------
localhost 7677

このブローカを停止してもよろしいですか ? (y/n)[n] y

localhost:7677 でのブローカの停止を待機しています...
ブローカの停止に成功しました。

6. broker1とbroker2を起動します。

C:\OpenMQ\mq\bin>imqbrokerd -name broker1 -port 7676
[14/4/2009:21:26:03 JST]
==================================================================
Open Message Queue 4.3
Sun Microsystems, Inc.
バージョン: 4.3 (Build 7-g)
コンパイル: Fri 11/07/2008

Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. Use is
subject to license terms.
==================================================================
Java ランタイム: 1.6.0_12 Sun Microsystems Inc. C:\jdk1.6.0_12\jre
[14/4/2009:21:26:03 JST] IMQ_HOME=C:\OpenMQ\mq
[14/4/2009:21:26:03 JST] IMQ_VARHOME=C:\OpenMQ\var\mq
[14/4/2009:21:26:03 JST] Windows XP 5.1 x86 PSQ96040 (2 cpu) takemura
[14/4/2009:21:26:03 JST] Java ヒープサイズ: 最大=189376k、現在=18176k
[14/4/2009:21:26:03 JST] 引数: -name broker1 -port 7676
[14/4/2009:21:26:03 JST] [B1060]: 持続データの読み込み中...
[14/4/2009:21:26:05 JST] プラグイン持続ストアの使用:
version=410
brokerid=broker1
database connection url=jdbc:mysql://192.168.70.128:3307,192.168.70.128:3308/openmq
database user=root
[14/4/2009:21:26:05 JST] 警告 [B4239]: データベーステーブル MQVER41Cmqhamysqlcluster から持続ストアバージョンを読み込めませんでした:
java.sql.SQLException: [SELECT STORE_VERSION FROM MQVER41Cmqhamysqlcluster]: Table 'openmq.MQVER41Cmqhamysqlcluster' doesn't exist
[14/4/2009:21:26:05 JST] [B1114]: 持続的ストアが新規に作成されます...
[14/4/2009:21:26:05 JST] データベーステーブル MQVER41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:06 JST] データベーステーブル MQBKR41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:07 JST] データベーステーブル MQSES41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:08 JST] データベーステーブル MQPROP41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:10 JST] データベーステーブル MQCREC41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:11 JST] データベーステーブル MQCON41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:13 JST] データベーステーブル MQCONSTATE41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:14 JST] データベーステーブルのインデックス MQCONSTATE41CmqhamysqlclusterIDX1 を作成しています...
[14/4/2009:21:26:16 JST] データベーステーブル MQDST41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:18 JST] データベーステーブルのインデックス MQDST41CmqhamysqlclusterIDX1 を作成しています...
[14/4/2009:21:26:19 JST] データベーステーブル MQMSG41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:21 JST] データベーステーブルのインデックス MQMSG41CmqhamysqlclusterIDX1 を作成しています...
[14/4/2009:21:26:22 JST] データベーステーブル MQTXN41Cmqhamysqlcluster を作成しています...
[14/4/2009:21:26:25 JST] データベーステーブルのインデックス MQTXN41CmqhamysqlclusterIDX1 を作成しています...
[14/4/2009:21:26:29 JST] [B1039]: ブローカ "broker1@PSQ96040:7676" の準備ができました。

(broker2の起動ログは省略します)
初回起動時に関連するテーブルが存在しなければ自動的に作成されます。

MySQLテーブルは以下のようになります。

mysql> show table status;
+-------------------------------+------------+---------
| Name | Engine | Version
+-------------------------------+------------+---------
| MQBKR41Cmqhamysqlcluster | ndbcluster | 10
| MQCON41Cmqhamysqlcluster | ndbcluster | 10
| MQCONSTATE41Cmqhamysqlcluster | ndbcluster | 10
| MQCREC41Cmqhamysqlcluster | ndbcluster | 10
| MQDST41Cmqhamysqlcluster | ndbcluster | 10
| MQMSG41Cmqhamysqlcluster | ndbcluster | 10
| MQPROP41Cmqhamysqlcluster | ndbcluster | 10
| MQSES41Cmqhamysqlcluster | ndbcluster | 10
| MQTXN41Cmqhamysqlcluster | ndbcluster | 10
| MQVER41Cmqhamysqlcluster | ndbcluster | 10
+-------------------------------+------------+---------

7. Broker起動状態の確認 (imqcmd list bkr)

C:\OpenMQ\mq\bin>imqcmd list bkr
ユーザ名: admin
パスワード: admin
次のブローカが所属するクラスタ内のすべてのブローカをリストします:

---------------------
ホスト プライマリポート
---------------------
localhost 7676

クラスタ ID mqhamysqlcluster
クラスタは高可用性です true

--------------------------------------------------------------------------
ブローカの ID 前回からの時間
ブローカ ID アドレス 状態 ストア内のメッセージ 継承を実行しています 状態のタイムスタンプ
--------------------------------------------------------------------------
broker2 PSQ96040:7677 OPERATING 0 5 minutes
broker1 PSQ96040:7676 OPERATING 0 5 minutes

クラスタ内のブローカのリストに成功しました。

8.送受信のテスト
QK2もしくはQBrowserV2などでメッセージの送受信テストを行います。
Myはうすオリジナルツールの使い方は以下をご覧ください。
送受信のテスト
QBrowserV2(QK2の後継ツール)

QBrowserV2を使ってテストしてみます。接続先のポート番号を変更する場合は.batファイルの最後の部分を変更してください。
新規メッセージ→新規メッセージ作成を選択します。

宛先名をTESTにし、Message bodyに「broker1へ送ったメッセージ」と入力し送信ボタンを押します。

broker1(localhost:7676)にメッセージが入っていることが確認できます。

もう1つQBrowserV2を起動してbroker2(localhost:7677)に接続します。QUEUE名にTESTと入力してBrowseボタンを押しても何も出てこないことを確認します。[宛先コマンド]→[全宛先情報リスト]またはimqcmd list dst -b localhost:7677 -t q -n TESTコマンドを実行してキューの中を見てみます。

0になっていることが確認できます。ここで、broker1を強制シャットダウンし情報をbroker2に引き継がせてみたいと思います。broker1をシャットダウンするとすぐにbroker2がダウンを検知します。

[15/4/2009:22:23:56 JST] [B1214]: ブローカ broker1 が応答しません。継承を試みています
[15/4/2009:22:23:56 JST] brokerStatusChanged broker1:
BrokerStatus[LINK_DOWN:IN_DOUBT:UP]
BrokerStatus[LINK_DOWN:NOT_INDOUBT:DOWN]
null
[15/4/2009:22:23:56 JST] [B1183]: Unsuspect broker1
[15/4/2009:22:23:56 JST] [B1184]: Unsuspected /192.168.11.9:7676 [brokerID=broker1, brokerSession=8722663960158640128] (seq#=0, ts=1239774656921, i
[15/4/2009:22:23:56 JST] [B1186]: broker1 へのハートビートを停止します
[15/4/2009:22:23:56 JST] [B1182]: ハートビートエンドポイント /192.168.11.9:7676 [brokerID=broker1, brokerSession=8722663960158640128] (seq#=0, ts=1
[15/4/2009:22:23:56 JST] [B1216]: ブローカ broker1 のストアの継承を試みています
[15/4/2009:22:23:56 JST] [B1175]: このブローカはブローカ [brokerID=broker1, storeSession=8722663960158640128] を継承しようとしています
[15/4/2009:22:23:56 JST] [B1168]: 障害の発生したブローカ broker1 の継承ロックが取得されました (タイムスタンプ: 1239774656921)
[15/4/2009:22:23:56 JST] state = FAILOVER_PENDING broker1
[15/4/2009:22:23:56 JST] state = FAILOVER_STARTED broker1
[15/4/2009:22:23:57 JST] ブローカ broker1 から 0 個のローカル送信先を継承中です: []
[15/4/2009:22:23:57 JST] ブローカ broker1 から 1 個のメッセージを継承中です
[15/4/2009:22:23:57 JST] ブローカ broker1 から 0 個のトランザクションを継承中です
[15/4/2009:22:23:57 JST] ブローカ broker1 から 0 個のリモートトランザクションを継承中です
[15/4/2009:22:23:57 JST] ブローカ broker1 から 3 個のストアセッションを継承中です: [7125574942207197952, 8722663960158640128, 2586509467726251264]
[15/4/2009:22:23:57 JST] state = FAILOVER_COMPLETE broker1
[15/4/2009:22:23:57 JST] [B1217]: ブローカ broker1 の継承が成功しました。そのブローカが所有するデータと状態情報を処理しています
[15/4/2009:22:23:57 JST] [B1218]: ブローカ broker1 からの 0 トランザクションを処理しています
[15/4/2009:22:23:57 JST] [B1297]: ブローカ broker1 から 0 個のリモートトランザクションを処理しています
[15/4/2009:22:23:57 JST] monitoring 0 transactions
[15/4/2009:22:23:57 JST] [B1283]: 継承後に 0 のトランザクションを処理しています
[15/4/2009:22:23:57 JST] [B1221]: ブローカ broker1 からの 1 メッセージを処理しています
[15/4/2009:22:23:57 JST] Loading message 23-192.168.11.9(ff:e4:53:af:d7:4f)-2513-1239774044828 on Q:TEST
[15/4/2009:22:23:57 JST] attempting to set Message Bytes Limit to 10485760K for destination TEST [キュー]
[15/4/2009:22:23:57 JST] attempting to set Message Count Limit to 100000 for destination TEST [キュー]
[15/4/2009:22:23:57 JST] attempting to set Message Size Limit to 10240K for destination TEST [キュー]
[15/4/2009:22:23:57 JST] Loading Stored destination TEST [キュー] connectionUID=null
[15/4/2009:22:23:57 JST] [B1151]: 1 メッセージとともに送信先 TEST [キュー] を読み込み
[15/4/2009:22:23:57 JST] [B1152]: 送信先 TEST [キュー] の読み込み完了
[15/4/2009:22:23:57 JST] [B1196]: 送信先 TEST [キュー] で 1 メッセージを処理します
[15/4/2009:22:23:57 JST] Message 23-192.168.11.9(ff:e4:53:af:d7:4f)-2513-1239774044828 has 1 consumers
[15/4/2009:22:23:57 JST] loadTakeoverMsgs: lookup 23-192.168.11.9(ff:e4:53:af:d7:4f)-2513-1239774044828:[consumer:0, type=NONE] found tid=null
[15/4/2009:22:23:57 JST] non-transacted: Routing Message 23-192.168.11.9(ff:e4:53:af:d7:4f)-2513-1239774044828 to 1 consumers:[consumer:0, type=NON
[15/4/2009:22:23:57 JST] [B1223]: broker1 の継承処理が完了しました
[15/4/2009:22:23:57 JST] [B1177]: ブローカ [brokerID=broker1, storeSession=8722663960158640128] のブロードキャスト継承が完了しました...
[15/4/2009:22:23:57 JST] Processing G_TAKEOVER_COMPLETE(41)


broker2のTESTキューを見てみるときちんとメッセージが継承されて来ているのがわかります。

上記はあくまでメッセージの継承動作の説明のためのものです。broker1をわざわざdownさせてbroker2へ継承させずともJMSクライアントは普通にbroker1のメッセージをbroker2からレシーブすることが可能です。以下のようなプログラムを作成して試してみます。プログラムを実行する前にbroker1(localhost:7676)に対してメッセージを送っておきます。(broker1,broker2共に生きている状態です)

com.sun.messaging.ConnectionFactory conFactory = new com.sun.messaging.ConnectionFactory();
conFactory.setProperty(ConnectionConfiguration.imqAddressList, "localhost:7677");

javax.jms.QueueConnection tcon = conFactory.createQueueConnection("admin","admin");
tsession = tcon.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
tcon.start();

javax.jms.Queue queue = tsession.createQueue("TEST");
MessageConsumer receiver = tsession.createConsumer(queue);
TextMessage tmessage = (TextMessage)receiver.receive();

receiver.close();
tsession.close();
tcon.close();

[結果]
broker2へ接続してもbroker1のメッセージが取得できました。

broker1に送ったメッセージですよ。

9. MySQL ClusterのSQLノードがダウンした場合
SQLノードは2重化してありますので、MQの稼動に問題は生じません。(Connector/Jのフェイルオーバ機能で活動中のSQLノードに自動でコネクトされます。フェイルオーバ中にアクセスしてきたJMSクライアントには例外が返されます。これはアプリ側で処理する必要があります。設定によってフェイルオーバ完了までに時間の差が生じますので必要に応じて調整を行ってください。以下のメッセージがブローカログに出てくればSQLノードのフェイルオーバが完了しています。

jdbc:mysql://192.168.70.128:3308,192.168.70.128:3307/openmq へのデータベース接続が再確立されました。

MySQL ClusterやOpen MQに関するご相談はSUN-PSまでお寄せください。設計、構築、アプリ開発、技術支援などに既に多数のお客様よりご好評いただいております。MySQLメニューについてはこちらをご覧ください。