Cometメッセージハンドラチュートリアル

CometBroker

CometBroker クラスは、mpchat_server で CometMessageHandler を利用している部分である。 定義は次のようになっている:

class CometBroker: public nine::CometMessageHandler
{
   public:
      bool initialize(ChatService*, const char* path);

      virtual void turn();

      virtual bool handleMessage(nine::Message* pMsg);
      virtual void onSessionOpen(int sid);
      virtual void onSessionClose(int sid);

   private:
      void notifyLog(int sid);
      ChatService *m_pService;
      struct Comm {
            inline Comm() { }
            int sid;
            bool connected;
            int lastIndex;
      };
      std::vector< Comm > m_comms;
};

HttpServer への登録

CometBroker は HttpMessageHandler であり、HttpHandler である。 HttpHandler は、HttpServer の authorizeHttp() 関数の戻り値としてフレームワークに渡すことで利用されるようになる。

この処理は Server の initialize() 内で実装されている:
bool Server::initialize(const ServerConfig* pCfg)
{
      ...
      if (! m_cometBroker.initialize(&m_service, "/comet/")) {
         return false;
      }
      m_httpServer.setHandler("/comet/", &m_cometBroker);

m_httpServer は PathHttpServer である。 setHandler() により、CometBroker は "/comet/" で始まるリクエストパスに対応するよう設定される。

一方、CometMessageServer では、"/u" で終わるパスをアップロード用に、"/d" ダウンロード用に割り当てる。

これらの組み合わせにより、"/comet/u" がアップロード用、"/comet/d" がダウンロード用のパスになる。

initialize

initialize() では、親クラスである CometMessageHandler の初期化を行っている。
bool CometBroker::initialize(ChatService* pService, const char* path)
{
   {
      nine::CometMessageHandlerConfig cfg;
      cfg.nSessions = 10;
      cfg.sessionTimeoutSec = 10;
      cfg.sessionCookie = "sid";
      cfg.sessionCookiePath = path;
      cfg.jsonp = "jsonp";
      cfg.enableQueryStringMarshal = true;
      cfg.enablePostMarshal = true;
      typedef nine::CometMessageHandler super;
      if (! super::initialize(&cfg)) { return false; }
   }

   m_comms.resize(10);
   for (int i=0; i<m_comms.size(); ++i) {
      m_comms[i].connected = false;
   }

   m_pService = pService;
   return true;
}
上のコードは具体的には次のような設定を行っている:
  • Cometセッションの同時接続数は 10
  • Cometセッションは cookie を用いる。cookie のパスは引数として与えられる。
  • 要求のメッセージは QueryString と HTTP POST の両方の形式を受け付ける。
  • 応答は jsonp 形式で行い、その関数名は "jsonp" である。
CometMessageHandlerConfig の項目は MessageHandlerConfig と同じである。

ここで、initialize() の引数として与えられる cookie パスは "/comet/" である。 クライアントが要求するパスは "/comet/u" と "/comet/d" であるが、どちらも "/comet/" の cookie の適用範囲になっている。どちらの接続でも同一の cookie を使われることになり、結果として同じセッションID が指定されてくる。

notifyLog()

notifyLog() は private 関数であり、このクラスの他の関数から呼ばれる。 処理内容は、ChatService からログを取得し、LogNotify メッセージに入れて post することである。

void CometBroker::notifyLog(int cid)
{
   LogNotify msg;
   m_pService->get(m_comms[cid].lastIndex, &msg);

   int n = msg.get_chat_length();
   if (n == 0) {
      return;
   }
   if (! post(cid, &msg)) {
      return;
   }
   
   m_comms[cid].lastIndex = msg.get_chat(n-1).get_index();
}

ChatService::get では、受け取るログの最初の番号を指定できる。 CometBroker はこの番号を lastIndex に記憶しておき、get に渡している。

notifyLog() はダウンロードチャネルが切断されている時も呼ばれる。 この時は post() が失敗し、サンプルコードでは何もせずに関数から出る。 post() 成功時は、最後に送信したメッセージ番号を更新している。

handleMessage()

handleMessage() は、要求に含まれていたメッセージを処理するハンドラである。

Comet はサーバープッシュが実現できるので、プロトコル設計のうち LogRequest は使われない。 サンプルでは、ChatRequest のみをを処理している:
   int sid = pMsg->getCommunicatorId();
   uint16_t msgid = pMsg->getMessageId();
   if (msgid == ChatRequest::_MESSAGE_ID) {
      ...
   } else if (msgid == LogRequest::_MESSAGE_ID) {
      return false;
   }

ChatRequest は、クライアントが新規発言をした際に送られるメッセージである。 ChatRequest は同時に発言ログの要求も行う。

サーバー側では、
  • 発言を発言ログへ追加する
  • 新しいログを返す
という処理を行う。コードは次のようになる:
   if (msgid == ChatRequest::_MESSAGE_ID) {
      ChatRequest* p = static_cast< ChatRequest* >(pMsg);
      m_pService->add(p->get_chat());
      notifyLog(sid);
      return true;
CometBroker は最後に送信したログを記憶しているので、ChatRequest のログ番号指示は無視している。