[PHP语言] PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

[复制链接]
查看1851 | 回复14 | 2020-10-31 15:04:39 | 显示全部楼层 |阅读模式

延时队列

  • Delayproducer.Php

  • Amqpbuilder.Php

AmqpBuilder.php

<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Builder\Builder;
use Hyperf\Amqp\Builder\QueueBuilder;
class AmqpBuilder extends QueueBuilder
{
    /**
     * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments
     *
     * @return \Hyperf\Amqp\Builder\Builder
     */
    public function setArguments($arguments) : Builder
    {
        $this->arguments = array_merge($this->arguments, $arguments);
        return $this;
    }
    /**
     * 设置延时队列相关参数
     *
     * @param string $queueName
     * @param int    $xMessageTtl
     * @param string $xDeadLetterExchange
     * @param string $xDeadLetterRoutingKey
     *
     * @return $this
     */
    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
    {
        $this->setArguments([
            'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒
            'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],
            'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
        ]);
        $this->setQueue($queueName);
        return $this;
    }
}

DelayProducer.php

<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class DelayProducer extends Builder
{
    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws \Throwable
     */
    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
        {
            return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
        });
    }
    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws \Throwable
     */
    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
        $result = false;
        $this->injectMessageProperty($producerMessage);
        $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
        $pool    = $this->getConnectionPool($producerMessage->getPoolName());
        /** @var \Hyperf\Amqp\Connection $connection */
        $connection = $pool->get();
        if ($confirm) {
            $channel = $connection->getConfirmChannel();
        } else {
            $channel = $connection->getChannel();
        }
        $channel->set_ack_handler(function () use (&$result)
        {
            $result = true;
        });
        try {
            // 处理延时队列
            $exchangeBuilder = $producerMessage->getExchangeBuilder();
            // 队列定义
            $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
            // 路由定义
            $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
            // 队列绑定
            $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            // 消息发送
            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            $channel->wait_for_pending_acks_returns($timeout);
        } catch (Throwable $exception) {
            // Reconnect the connection before release.
            $connection->reconnect();
            throw $exception;
        }
        finally {
            $connection->release();
        }
        return $confirm ? $result : true;
    }
    /**
     * @param ProducerMessageInterface $producerMessage
     */
    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
    {
        if (class_exists(AnnotationCollector::class)) {
            /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
            if ($annotation) {
                $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
                $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
            }
        }
    }
}

处理超时订单

  • Orderqueueconsumer.Php

  • Orderqueueproducer.Php

Orderqueueproducer.php

<?php
declare(strict_types = 1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder\ExchangeBuilder;
use Hyperf\Amqp\Message\ProducerMessage;
/**
 * @Producer(exchange="order_exchange", routingKey="order_exchange")
 */
class OrderQueueProducer extends ProducerMessage
{
    public function __construct($data)
    {
        $this->payload = $data;
    }
    public function getExchangeBuilder() : ExchangeBuilder
    {
        return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
    }
}

Orderqueueconsumer.php

<?php
declare(strict_types = 1);
namespace App\Amqp\Consumer;
use App\Service\CityTransport\OrderService;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
/**
 * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
 */
class OrderQueueConsumer extends ConsumerMessage
{
    public function consume($data) : string
    {
       ##业务处理
    }
    public function isEnable() : bool
    {
        return true;
    }
}

Demo

$builder = new AmqpBuilder();
        $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
        $que = ApplicationContext::getContainer()->get(DelayProducer::class);
        var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

推荐教程:《PHP教程》

以上就是PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容,更多请关注爱上源码网其它相关文章!

  • 微信
  • 分享
  • 相关标签:php hyperf
  • 本文转载于:learnku,如有侵犯,请联系916990011@qq.com删除
    • 上一篇:PHP 消息队列 Kafka 使用
    • 下一篇:PHP面向对象到底是啥?十分钟通俗易懂图文教程

    相关文章

    相关视频

    • PHP 微服务集群搭建 - Hyperf
    • 如何基于Hyperf实现RabbitMQ+WebS...
    • 如何通过PHPStorm配置Hyperf热更新开发...
    • PHP 框架 Hyperf 实现处理超时未支付订单...
  • ThinkPHP6.0 数据库
  • ThinkPHP6.0 请求
  • ThinkPHP6.0 模型
  • ThinkPHP6.0 杂项
  • 本文有爱上源码下载完入驻作者发布,如果对您版权造成侵害,可以联系本站站长管理进行维权删除,本站收到维权24小时内进行处理,谢谢您关注23ym.cn! 本站分享大量程序员技术文章以及对编程开发的初级入门教程,包括图文讲解笔记和高清视频下载~
    回复

    使用道具 举报

    愿为素心人 | 2020-11-9 09:15:27 | 显示全部楼层
    找了好多地方,终于找到了
    回复

    使用道具 举报

    文叶儿 | 2021-7-13 17:33:58 | 显示全部楼层
    悟空源码资源不错。粉了
    回复

    使用道具 举报

    游戏乾坤 | 2022-4-14 05:20:23 | 显示全部楼层
    6666悟空源码资源多!
    回复

    使用道具 举报

    后会无期846 | 2022-7-31 21:04:01 | 显示全部楼层
    感谢悟空源码分享精品资源!
    回复

    使用道具 举报

    这个下载站资源真齐全
    回复

    使用道具 举报

    街角386 | 2022-8-4 15:00:30 | 显示全部楼层
    这个站很好,资源多,教程全
    回复

    使用道具 举报

    意乱了真假意wk | 2022-12-13 14:05:45 | 显示全部楼层
    感谢悟空源码分享精品资源!
    回复

    使用道具 举报

    非究思身 | 2022-12-19 11:47:03 | 显示全部楼层
    资源太多了,准备办个会员
    回复

    使用道具 举报

    Holily1985 | 2023-2-27 20:52:27 | 显示全部楼层
    悟空源码资源不错。粉了
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则