rocketmq-php-client 下载地址:https://github.com/zhongwenyu/rocketMQ
thinkphp6框架中开发代码:
<?php
/**
* RockerMq生产者
*/
declare (strict_types=1);
namespace appcontrollermq;
use thinkException;
class RocketMqProducer extends Base
{
public function __construct()
{
}
/**
* 发送消息
*/
public function send()
{
$InstanceName = WmsStock ;
$NamesrvAddr = 192.168.100.209:9876 ;
$producer = new RocketMQProducer($InstanceName);
$producer->setInstanceName($InstanceName); //设置实例名
$producer->setNamesrvAddr($NamesrvAddr); //设置名字服务链接地址
//$producer->setTcpTransportPullThreadNum(40); //设置传出链接线程数 默认是cpu core的数值
//$producer->setTcpTransportConnectTimeout(3000); //设置tcp链接超时时间,单位是毫秒
//$producer->setTcpTransportTryLockTimeout(3000); //设置申请锁超时时间,单位是毫秒
//$producer->setSendMsgTimeout(1000); //发送消息超时时间,单位是毫秒
$producer->start();
//生产消息
$message = new RocketMQMessage( TIANMASPORT , MQ_TAG_SUPPLIER , 发送内容 hello world! );
$sendResult = $producer->send($message);
print_r($sendResult);
printf(“|%-30s|%-40s|
“, “msgId”, $sendResult->getMsgId());printf(“|%-30s|%-40s|
“, “offsetMsgId”, $sendResult->getOffsetMsgId());printf(“|%-30s|%-40s|
“, “sendStatus”, $sendResult->getSendStatus());printf(“|%-30s|%-40s|
“, “queueOffset”, $sendResult->getQueueOffset());printf(“|%-30s|%-40s|
“, “body”, $message->getBody());$producer->shutdown();
}
}