热搜:NVER node 开发 php

php 连接zookeeper实例

2024-08-13 11:35:01
php 连接zookeeper实例

1、安装成功zookeeper后,在zookeeper 的bin目录下有启动相应的启动脚本

启动Server

                      ./zkServer.sh start

启动client:(*注:cli需要安装java)

                     zkCli.sh

2、PHP实例:

<?php

class  ZookeeperDemo  extends  Zookeeper {

 

   public function  watcher( $i, $type, $key ) {

     echo  "Insider Watcher\n"   ;

 

     // Watcher gets consumed so we need to set a new one

    $this->get(  '/test' ,  array  ($this,  'watcher'  ) );

  }

}

$zoo =  new  ZookeeperDemo(   '127.0.0.1:2181'  );

$zoo->get(  '/test' ,  array  ($zoo,  'watcher'  ) );

while  (  true  ) {

   echo  '.'   ;

  sleep(2);

}

leader与worker任务的分配:

class  Worker  extends  Zookeeper {

 

   const  CONTAINER =  '/cluster'  ;

 

   protected  $acl   =  array (

                     array  (

                       'perms'  => Zookeeper::  PERM_ALL ,

                       'scheme'  =>  'world'   ,

                       'id'  =>  'anyone'  ) );

 

   private  $isLeader   =  false ;

 

   private  $znode   ;

 

   public function  __construct( $host =  '' , $watcher_cb =  null  , $recv_timeout = 10000 ) {

     parent :: __construct( $host, $watcher_cb, $recv_timeout );

  }

 

   public function  register() {

     if ( ! $this->exists(  self  :: CONTAINER   ) ) {

      $this->create(  self  :: CONTAINER   ,  null , $this->  acl  );

    }

 

    $this->znode = $this->create(  self  ::CONTAINER .  '/w-'   ,

                                   null  ,

                                  $this->acl,

                                  Zookeeper:: EPHEMERAL | Zookeeper::SEQUENCE );

 

    $this->  znode  = str_replace(  self  :: CONTAINER   . '/'   ,  ''   , $this->  znode  );

 

    printf(  "I'm registred as: %s\n" , $this->   znode  );

 

    $watching = $this->watchPrevious();

 

     if ( $watching == $this->   znode  ) {

      printf(  "Nobody here, I'm the leader\n"  );

      $this->setLeader(  true  );

    }

     else  {

      printf(  "I'm watching %s\n"  , $watching );

    }

  }

 

   public function  watchPrevious() {

    $workers = $this->getChildren(  self  :: CONTAINER   );

    sort( $workers );

    $size = sizeof( $workers );

     for ( $i = 0 ; $i

       if ( $this->   znode  == $workers[ $i ] ) {

         if  ( $i > 0 ) {

          $this->get(  self  :: CONTAINER   .  '/'  . $workers[ $i - 1 ],  array  ( $this,  'watchNode'  ) );

           return  $workers[ $i - 1 ];

        }

 

         return  $workers[ $i ];

      }

    }

 

     throw new  Exception(  sprintf(  "Something went very wrong! I can't find myself: %s/%s" ,

                           self  :: CONTAINER   ,

                          $this->   znode  ) );

  }

 

   public function  watchNode( $i, $type, $name ) {

    $watching = $this->watchPrevious();

     if ( $watching == $this->   znode  ) {

      printf(  "I'm the new leader!\n"  );

      $this->setLeader(  true  );

    }

     else  {

      printf(  "Now I'm watching %s\n"  , $watching );

    }

  }

 

   public function  isLeader() {

     return  $this->   isLeader  ;

  }

 

   public function  setLeader($flag) {

    $this->  isLeader  = $flag;

  }

 

   public function  run() {

    $this->register();

 

     whiletrue  ) {

       if ( $this->isLeader() ) {

        $this->doLeaderJob();

    }

     else  {

      $this->doWorkerJob();

    }

 

      sleep( 2 );

    }

  }

 

   public function  doLeaderJob() {

     echo  "Leading\n"   ;

  }

 

   public function  doWorkerJob() {

     echo  "Working\n"   ;

  }

}

$worker =  new  Worker(  '127.0.0.1:2181'  );

$worker->run();

可以启动3个php进程,查看脚本的运行。
进程1:
[root@localhost zookeeper]# php -f worker.php
I'm registred as: w-0000000010
Nobody here, I'm the leader
Leading
进程2:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000011
I'm watching w-0000000010
Working
进程3:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000012
I'm watching w-0000000011
Working

ctrl + c 关闭leader进程后,会发现进程2与3会选举出新的leader