热搜:NVER node 开发 php

php接收rabbitMQ消息并执行繁重任务

2024-07-20 21:30:01
php接收rabbitMQ消息并执行繁重任务

1) 建立消息队列基础类

<?php/** * @desc 消息队列 * @author caifangjie * @date 2016/05/03 */class Queue{    //交换机名称    protected $_exchangeName = 'ex_auto_home';        //队列名称    protected $_queueName = 'qu_auto_home';        //路由    protected $_routeKey = 'ru_auto_home';        protected $_connectHandler;        protected $_channelObject;    protected $_exchangeObject;        protected $_queueObject;        //配置信息    protected $_config = array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest');    //构造函数,依次创建通道,交换机,队列    public function __construct()    {        try{            $this->_connectHandler = new AMQPConnection($this->_config);            if(!$this->_connectHandler->connect()) {                die('connect failed');            }            $this->createChannel();            $this->createExchange();            $this->createQueue();        } catch(Exception $e) {            echo $e->getMessage();        }    }        //创建通道    protected function createChannel()    {        $this->_channelObject = new AMQPChannel($this->_connectHandler);    }        //创建交换机    public function createExchange($exchangeName='', $exchangeType=AMQP_EX_TYPE_DIRECT)    {        $exName = $exchangeName?$exchangeName:$this->_exchangeName;        $this->_exchangeObject = new AMQPExchange($this->_channelObject);        $this->_exchangeObject->setName($exName);        $this->_exchangeObject->setType($exchangeType);        $this->_exchangeObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_exchangeObject->declareExchange();    }        //创建队列    public function createQueue()    {        $this->_queueObject = new AMQPQueue($this->_channelObject);        $this->_queueObject->setName($this->_queueName);        $this->_queueObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_queueObject->declareQueue();        $this->_queueObject->bind($this->_exchangeObject->getName(), $this->_routeKey);    }  }


2)有一个任务,会连续向队列中推送消息,累计起来,队列中会有大量的消息.....

3)客户端连续的接受队列中的消息,并执行相应的任务

<?phprequire_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{    public function __construct()    {        parent::__construct();    }    //接受消息    public function recvMessage()    {        while (true) {            $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                   // var_dump($requestUrl);                    $execHandler = new ExecProcess();                    $execHandler->start($requestUrl);                    $execHandler->execSave();                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();


已知 require_once 'ExecProcess.class.php'; 这个类是没有问题的,单独执行可以通过,但是加到消息队列的客户端,接收消息,并执行一个繁重任务时,注:(php-cli模式下)执行时,客户端直接退出,无报错。

如果像下面这样时,则是可以正常运行,并打印队列中的消息的

<?php//require_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{    public function __construct()    {        parent::__construct();    }    //接受消息    public function recvMessage()    {        while (true) {            $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                    var_dump($requestUrl);//                    $execHandler = new ExecProcess();//                    $execHandler->start($requestUrl);//                    $execHandler->execSave();//                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();


也就是把ExecProcess.class.php'加进来时,接收消息的客户端,会自动退出,而且不会报错。。。

相反,去掉require_once 'ExecProcess.class.php';并把处理消息的逻辑去掉,是可以把队列中的消息打印出来的.....不知道是什么鬼

因为买的书,还没来得及看。

问题因该是很明显的,谁能给我一个思路,或者提示? 3Q


回复讨论(解决方案)

我觉得我很忧伤......这是要沉贴,翻船的节奏吗?

ExecProcess是不是出问题了什么

去掉require_once 'ExecProcess.class.php';并把处理消息的逻辑去掉,是可以把队列中的消息打印出来的.....

看你描述,应该是ExecProcess.class.php中,处理消息的部分出问题了,重点检查这部分代码,看看是什么异常。

执行繁重任务才出错,
可以检查是否执行超时导致。