bl双性强迫侵犯h_国产在线观看人成激情视频_蜜芽188_被诱拐的少孩全彩啪啪漫画

rabbitmq延遲隊(duì)列之php實(shí)現(xiàn)

延遲任務(wù)應(yīng)用場(chǎng)景

我們提供的服務(wù)有:成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、微信公眾號(hào)開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、蘇州ssl等。為超過(guò)千家企事業(yè)單位解決了網(wǎng)站和推廣的問(wèn)題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的蘇州網(wǎng)站制作公司

場(chǎng)景一:物聯(lián)網(wǎng)系統(tǒng)經(jīng)常會(huì)遇到向終端下發(fā)命令,如果命令一段時(shí)間沒(méi)有應(yīng)答,就需要設(shè)置成超時(shí)。

場(chǎng)景二:訂單下單之后30分鐘后,如果用戶沒(méi)有付錢,則系統(tǒng)自動(dòng)取消訂單。

實(shí)現(xiàn)方案

  1. 定時(shí)任務(wù)輪詢數(shù)據(jù)庫(kù),看是否有產(chǎn)生新任務(wù),如果產(chǎn)生則消費(fèi)任務(wù)

  2. pcntl_alarm為進(jìn)程設(shè)置一個(gè)鬧鐘信號(hào)

  3. swoole的異步高精度定時(shí)器:swoole_time_tick(類似javascript的setInterval)和swoole_time_after(相當(dāng)于javascript的setTimeout)

  4. rabbitmq延遲任務(wù)

以上四種方案,如果生產(chǎn)環(huán)境有使用到swoole建議使用第三種方案。此篇文章重點(diǎn)講述第四種方案實(shí)現(xiàn)

Rabbitmq延遲隊(duì)列實(shí)現(xiàn)

RabbitMQ沒(méi)有直接去實(shí)現(xiàn)延遲隊(duì)列這個(gè)功能。而是需要通過(guò)消息的TTL和死信Exchange這兩者的組合來(lái)實(shí)現(xiàn)。


消息的TTL(Time To Live)

消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。

可以通過(guò)設(shè)置消息的expiration字段或者隊(duì)列x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。下面例子是通過(guò)隊(duì)列的ttl實(shí)現(xiàn)死信

$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']?:'');
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
        'x-dead-letter-exchange' => 'delay_exchange',
        'x-dead-letter-routing-key' => 'delay_route',
        'x-message-ttl' => 60000,
));
$queue->declareQueue();

當(dāng)上面的消息扔到該隊(duì)列中后,過(guò)了60秒,如果沒(méi)有被消費(fèi),它就死了。不會(huì)被消費(fèi)者消費(fèi)到。這個(gè)消息后面的,沒(méi)有“死掉”的消息對(duì)頂上來(lái),被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會(huì)被刪除和釋放,它會(huì)被統(tǒng)計(jì)到隊(duì)列的消息數(shù)中去。單靠死信還不能實(shí)現(xiàn)延遲任務(wù),還要靠Dead Letter Exchange。


Dead Letter Exchanges

Exchage的概念在這里就不在贅述,可以從這里進(jìn)行了解。一個(gè)消息在滿足如下條件下,會(huì)進(jìn)死信路由,記住這里是路由而不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)很多隊(duì)列。

1. 一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。

2. 上面的消息的TTL到了,消息過(guò)期了。

3. 隊(duì)列的長(zhǎng)度限制滿了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。

Dead Letter Exchange其實(shí)就是一種普通的exchange,和創(chuàng)建其他exchange沒(méi)有兩樣。只是在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過(guò)期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去。

示例

生產(chǎn)者:

<?php

header('Content-Type:text/html;charset=utf8;');

$params = array(
    'exchangeName' => 'test_cache_exchange',
    'queueName' => 'test_cache_queue',
    'routeKey' => 'test_cache_route',
);

$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'rabbitmq',
    'password' => 'rabbitmq',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴(kuò)展

//exit();
try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 記錄日志
        echo 'rabbit-mq 連接錯(cuò)誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 記錄日志
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setFlags(AMQP_DURABLE);//持久化
    $exchange->setName($params['exchangeName']?:'');
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']?:'');
    $queue->setFlags(AMQP_DURABLE);
    $queue->setArguments(array(
        'x-dead-letter-exchange' => 'delay_exchange',
        'x-dead-letter-routing-key' => 'delay_route',
        'x-message-ttl' => 60000,
    ));
    $queue->declareQueue();

    //綁定
    $queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {

}


//$num = mt_rand(100, 500);
$num = 1;

//生成消息
$exchange->publish("this is test message..", $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));

消費(fèi)者:

<?php

header('Content-Type:text/html;charset=utf8;');

$params = array(
    'exchangeName' => 'delay_exchange',
    'queueName' => 'delay_queue',
    'routeKey' => 'delay_route',
);

$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'rabbitmq',
    'password' => 'rabbitmq',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));

//exit();

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 記錄日志
        echo 'rabbit-mq 連接錯(cuò)誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 記錄日志
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setFlags(AMQP_DURABLE);//聲明一個(gè)已存在的交換器的,如果不存在將拋出異常,這個(gè)一般用在consume端
    $exchange->setName($params['exchangeName']?:'');
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']?:'');
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //綁定
    $queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
    echo $e->getMessage();
    exit();
}

function callback(AMQPEnvelope $message) {
    global $queue;
    if ($message) {
        $body = $message->getBody();
        echo $body . PHP_EOL;
        $queue->ack($message->getDeliveryTag());
    } else {
        echo 'no message' . PHP_EOL;
    }
}

//$queue->consume('callback');  第一種消費(fèi)方式,但是會(huì)阻塞,程序一直會(huì)卡在此處

//第二種消費(fèi)方式,非阻塞
$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //應(yīng)答,代表該消息已經(jīng)消費(fèi)
        $end = time();
        echo '<br>' . ($end - $start);
        exit();
    }
    else
    {
        //echo 'message not found' . PHP_EOL;
    }
}

這個(gè)示例注意要跟上一篇博文示例作對(duì)比rabbitmq以及php amqp擴(kuò)展使用,最關(guān)鍵的點(diǎn)就是在生產(chǎn)者那里

$queue->setArguments(array(
        'x-dead-letter-exchange' => 'delay_exchange',
        'x-dead-letter-routing-key' => 'delay_route',
        'x-message-ttl' => 60000,
));

詳細(xì)過(guò)程:

  1. 首先由正常隊(duì)列(test_cache_queue)和正常exchange(test_cache_exchange),兩者相綁定。

  2. 該正常隊(duì)列設(shè)置了死信路由(delay_exchange)和死信路由key以及TTL,生產(chǎn)者生產(chǎn)消息到正常隊(duì)列和正常路由上.

  3. 當(dāng)正常隊(duì)列設(shè)置TTL時(shí)間一到,那延遲消息就會(huì)自動(dòng)發(fā)布到死信路由

  4. 消費(fèi)者通過(guò)死信路由(delay_exchange)和死信隊(duì)列(delay_queue)來(lái)消費(fèi)

參考文章:

https://www.cnblogs.com/haoxinyue/p/6613706.html

標(biāo)題名稱:rabbitmq延遲隊(duì)列之php實(shí)現(xiàn)
瀏覽路徑:http://vcdvsql.cn/article6/gjosig.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、電子商務(wù)、定制網(wǎng)站網(wǎng)站改版、網(wǎng)站維護(hù)、域名注冊(cè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)