行使Python学习RabbitMQ音信队列,pythonrabbitmq

4. ACK

实行多少个职责也许要求费用几秒钟,你恐怕会担忧壹旦一个主顾在实践职分进度中挂掉了。1旦RabbitMQ将音讯分发给了顾客,就能从内部存款和储蓄器中删除。在这种场馆下,倘若正在进行义务的买主宕机,会丢掉正在管理的新闻和散发给那么些消费者但并未有管理的消息。
可是,大家不想不见任何任务,假若有八个顾客挂掉了,那么大家应当将分发给它的天职交付给另多少个买主去处理。

为了确认保障消息不会丢掉,RabbitMQ支持消息应答。消费者发送三个新闻应答,告诉RabbitMQ那几个信息一度抽取并且管理完成了。RabbitMQ就可以去除它了。

故此手动ACK的广大手段

// 接收消息之后,主动ack/nak
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            System.out.println(" [ " + queue + " ] Received '" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
};

// 取消自动ack
channel.basicConsume(queue, false, consumer);

手动ack时,有个multiple,其意思表示:

能够知道为各类Channel维护三个unconfirm的信息序号集结,每publish一条数据,集结十月素加一,每次调一回handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录

 

利用Python学习RabbitMQ新闻队列,pythonrabbitmq

rabbitmq基本管理命令:

一步运营Erlang node和Rabbit应用:sudo rabbitmq-server

在后台运转Rabbit node:sudo rabbitmq-server -detached

关门全体节点(包含选取):sudo rabbitmqctl stop

add_user <UserName> <Password>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName>
<Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> …]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem>
…]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> …]

 

 

Demo:

producer.py

 1 #!/usr/bin/env python
 2 # -*- coding: utf_8 -*-
 3 # Date: 2015年11月30日
 4 # Author:蔚蓝行
 5 # 博客 http://www.cnblogs.com/duanv/
 6 
 7 import pika
 8 import sys
 9 
10 #创建连接connection到localhost
11 con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
12 #创建虚拟连接channel
13 cha = con.channel()
14 #创建队列anheng,durable参数为真时,队列将持久化;exclusive为真时,建立临时队列
15 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False)
16 #创建名为yanfa,类型为fanout的exchange,其他类型还有direct和topic,如果指定durable为真,exchange将持久化
17 cha.exchange_declare(durable=False,
18                      exchange='yanfa',
19                      type='direct',)
20 #绑定exchange和queue,result.method.queue获取的是队列名称
21 cha.queue_bind(exchange='yanfa',  
22                queue=result.method.queue,
23                routing_key='',) 
24 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
25 cha.basic_qos(prefetch_count=1)
26 #发送信息到队列‘anheng’
27 message = ' '.join(sys.argv[1:])
28 #消息持久化指定delivery_mode=2;
29 cha.basic_publish(exchange='',
30                   routing_key='anheng',
31                   body=message,
32                   properties=pika.BasicProperties(
33                      delivery_mode = 2,
34                  ))
35 print '[x] Sent %r' % (message,)
36 #关闭连接
37 con.close()

 

consumer.py

 1 #!/usr/bin/env python
 2 # -*- coding: utf_8 -*-
 3 # Date: 2015年11月30日
 4 # Author:蔚蓝行
 5 # 博客 http://www.cnblogs.com/duanv/
 6 import pika
 7 
 8 #建立连接connection到localhost
 9 con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
10 #创建虚拟连接channel
11 cha = con.channel()
12 #创建队列anheng
13 result=cha.queue_declare(queue='anheng',durable=True)
14 #创建名为yanfa,类型为fanout的交换机,其他类型还有direct和topic
15 cha.exchange_declare(durable=False,
16                      exchange='yanfa',  
17                      type='direct',)
18 #绑定exchange和queue,result.method.queue获取的是队列名称
19 cha.queue_bind(exchange='yanfa',
20                queue=result.method.queue,
21                routing_key='',)
22 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
23 cha.basic_qos(prefetch_count=1)
24 print ' [*] Waiting for messages. To exit press CTRL+C'
25 #定义回调函数
26 def callback(ch, method, properties, body):
27     print " [x] Received %r" % (body,)
28     ch.basic_ack(delivery_tag = method.delivery_tag)
29 
30 cha.basic_consume(callback,
31                   queue='anheng',
32                   no_ack=False,)
33 
34 cha.start_consuming()

 

一、概念:

Connection
多少个TCP的连年。Producer和Consumer都以通过TCP连接到RabbitMQ
Server的。程序的开首处正是树立那些TCP连接。

Channels: 虚构连接。建立在上述的TCP连接中。数据流动都以在Channel中进行的。一般景况是先后开始建立TCP连接,第一步正是确立那个Channel。

 

二、队列:

第三建构1个Connection,然后建设构造Channels,在channel上确立队列

树立即钦赐durable参数为真,队列将长久化;内定exclusive为真,队列为有的时候队列,关闭consumer后该队列将不再存在,一般景色下树立有时队列并不钦点队列名称,rabbitmq将轻巧起名,通过result.method.queue来获取队列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

区别:durable是队列长久化与否,假如为真,队列将在rabbitmq服务重启后仍存在,要是为假,rabbitmq服务重启前不会没有,与consumer关闭与否非亲非故;

而exclusive是确立有的时候队列,当consumer关闭后,该队列就能够被去除

 

三、exchange和bind

Exchange中durable参数钦点exchange是或不是长久化,exchange参数钦点exchange名称,type钦点exchange类型。Exchange类型有direct,fanout和topic。

Bind是将exchange与queue进行关联,exchange参数和queue参数分别钦定要开始展览bind的exchange和queue,routing_key为可选参数。

Exchange的三种形式:

Direct:

其余发送到Direct Exchange的音信都会被转接到routing_key中钦定的Queue

1.形似情状能够利用rabbitMQ自带的Exchange:””(该Exchange的名叫空字符串);

2.这种情势下无需将Exchange进行其余绑定(bind)操作;

三.音信传递时索要2个“routing_key”,能够回顾的敞亮为要发送到的队列名字;

四.万1vhost中不存在routing_key中钦命的行列名,则该音讯会被吐弃。

德姆o中就算声称了一个exchange=’yanfa’和queue=’anheng’的bind,然则在前面发送音讯时并不曾运用该exchange和bind,而是利用了direct的形式,未有钦赐exchange,而是钦赐了routing_key的称谓为队列名,音讯将发送到钦定队列。

若果三个exchange
评释为direct,并且bind中内定了routing_key,那么发送新闻时必要同期指明该exchange和routing_key.

Fanout:

任何发送到Fanout
Exchange的消息都会被转载到与该Exchange绑定(Binding)的富有Queue上

壹.得以清楚为路由表的情势

二.这种方式无需routing_key

叁.这种形式须求提前将Exchange与Queue进行绑定,1个Exchange能够绑定四个Queue,一个Queue能够同多少个Exchange进行绑定。

肆.要是接受到新闻的Exchange未有与其它Queue绑定,则音信会被丢掉。

德姆o中成立了二个将2个exchange和一个queue实行fanout类型的bind.可是发送新闻时未尝行使它,即使要用到它,只要在出殡和埋葬消息时钦点该exchange的称号就可以,该exchange就能够将音讯发送到全数和它bind的体系中。在fanout格局下,钦定的routing_key是低效的

Topic:

别的发送到Topic
Exchange的音讯都会被转正到持有关切
routing_key中钦命话题的Queue上

一.这种情势比较复杂,轻便的话,就是各样队列都有其关怀的宗旨,全部的新闻都包括八个“标题”(routing_key),Exchange会将音信转载到持有关切核心能与routing_key模糊匹配的体系。

贰.这种情势需求routing_key,恐怕要提前绑定Exchange与Queue。

3.在进展绑按时,要提供1个该队列关怀的大旨,如“#.log.#”表示该队列关心全体涉嫌log的音信(三个routing_key为”MQ.log.error”的新闻会被转化到该队列)。

4.“#”表示0个或若干个至关心器重要字,“*”表示三个器重字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述二者协作。

5.一律,假使Exchange未有开采能够与routing_key相配的Queue,则会屏弃此音讯。

 

四、职分分发

      
一.Rabbitmq的职务是循环分发的,要是翻开五个consumer,producer发送的消息是轮流发送到八个consume的。

2.在producer端使用cha.basic_publish()来发送音讯,当中body参数便是要发送的消息,properties=pika.BasicProperties(delivery_mode
= 贰,)启用音讯长久化,能够幸免RabbitMQ Server
重启也许crash引起的多少丢失。

三.在接收端使用cha.basic_consume()Infiniti循环监听,如若设置no-ack参数为真,每回Consumer接到数据后,而无论是是还是不是管理完毕,RabbitMQ
Server会立即把那个Message标识为成功,然后从queue中删去了。为了保障数据不被丢掉,RabbitMQ扶助新闻确认机制,即acknowledgments。为了保障数据能被精确管理而不唯有是被Consumer收到,那么大家不可能选择no-ack。而相应是在拍卖完数据后发送ack。

在拍卖多少后发送的ack,正是报告RabbitMQ数据已经被接收,管理完了,RabbitMQ能够去安全的删减它了。假如Consumer退出了而是并未有发送ack,那么RabbitMQ就可以把这几个Message发送到下三个Consumer。那样就确定保障了在Consumer非凡退出的状态下多少也不会丢掉。

这里并从未选拔超时机制。RabbitMQ仅仅通过Consumer的三番五次中断来认同该Message并从未被正确管理。也正是说,RabbitMQ给了Consumer丰富长的时光来做多少处理。

Demo的callback方法中ch.basic_ack(delivery_tag =
method.delivery_tag)告诉rabbitmq音讯已经正确管理。若是未有那条代码,Consumer退出时,Message会重新分发。然后RabbitMQ会占用更加多的内部存款和储蓄器,由于RabbitMQ会长期运作,因而那一个“内部存款和储蓄器泄漏”是沉重的。去调整这种不当,能够透过弹指间命令打字与印刷un-acked
Messages:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

      
四.公而忘私分发:设置cha.basic_qos(prefetch_count=一),那样RabbitMQ就能够使得种种Consumer在同1个日子点最多管理八个Message。换句话说,在收到到该Consumer的ack前,他它不会将新的Message分发给它。

 

五、注意:

劳动者和买主都应当注脚建构队列,网络教程上说第3遍创设假使参数和第三次不雷同,那么该操作尽管功成名就,可是queue的品质并不会被修改。

兴许因为版本难题,在笔者的测试中1经第3次证明建构的队列属性和第一次不完全同样,将报类似这种错40陆,
“PRECONDITION_FAILED – parameters for queue ‘anheng’ in vhost ‘/’ not
equivalent”

比方是exchange第2遍成立属性不一样,将报这种错40陆, “PRECONDITION_FAILED –
cannot redeclare exchange ‘yanfa’ in vhost ‘/’ with different type,
durable, internal or autodelete value”

如果第二回注明创设队列也应际而生那个荒唐,表达此前存在名字如出一辙的体系且此次表明的一些质量和前面扬言不一致,可通过命令sudo
rabbitmqctl
list_queues查看当前有啥队列。消除办法是声称建设构造另一称呼的队列或删除原有队列,若是原本队列是非长久化的,可经过重启rabbitmq服务删除原有队列,纵然原本队列是长久化的,只可以删除它所在的vhost,然后再重建vhost,再设置vhost的权柄(先认可该vhost中并未有其余有用队列)。

sudo rabbitmqctl delete_vhost /
sudo rabbitmqctl add_vhost /
sudo rabbitmqctl set_permissions -p / username ‘.*’ ‘.*’ ‘.*’

rabbitmq 基本管理命令: 一步运转Erlang node和Rabbit应用:sudo
rabbitmq-server 在后台运维Rabbit node:su…

应用Python学习RabbitMQ新闻队列,

RabbitMQ能够作为2个音信代理,它的中央原理非常简单:即收取和出殡和埋葬音讯,能够把它想象成七个邮局:我们把信件放入信箱,邮递员就能够把信件投递到你的收件人处,RabbitMQ正是2个邮箱、邮局、投递员功效综合体,整个进程就是:邮箱收到信件,邮局转发信件,投递员投递信件达到收件人处。

RabbitMQ和邮局的基本点差异就是RabbitMQ接收、存款和储蓄和殡葬的是贰进制数据—-新闻。

rabbitmq基本管理命令:

一步运转Erlang node和Rabbit应用:sudo rabbitmq-server

在后台运行Rabbit node:sudo rabbitmq-server -detached

关门全体节点(包罗利用):sudo rabbitmqctl stop

add_user <UserName> <Password>
delete_user <UserName>
change_password <UserName> <NewPassword>
list_users
add_vhost <VHostPath>
delete_vhost <VHostPath>
list_vhosts
set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
clear_permissions [-p <VHostPath>] <UserName>
list_permissions [-p <VHostPath>]
list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]

Demo:

producer.py

 #!/usr/bin/env python
 # -*- coding: utf_ -*-
 # Date: 年月日
 # Author:蔚蓝行
 # 博客 http://www.cnblogs.com/duanv/
 import pika
 import sys
 #创建连接connection到localhost
 con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 #创建虚拟连接channel
 cha = con.channel()
 #创建队列anheng,durable参数为真时,队列将持久化;exclusive为真时,建立临时队列
 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False)
 #创建名为yanfa,类型为fanout的exchange,其他类型还有direct和topic,如果指定durable为真,exchange将持久化
 cha.exchange_declare(durable=False,
           exchange='yanfa',
           type='direct',)
 #绑定exchange和queue,result.method.queue获取的是队列名称
 cha.queue_bind(exchange='yanfa', 
        queue=result.method.queue,
        routing_key='',) 
 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
 cha.basic_qos(prefetch_count=)
 #发送信息到队列‘anheng'
 message = ' '.join(sys.argv[:])
 #消息持久化指定delivery_mode=;
 cha.basic_publish(exchange='',
          routing_key='anheng',
          body=message,
          properties=pika.BasicProperties(
           delivery_mode = ,
         ))
 print '[x] Sent %r' % (message,)
 #关闭连接
 con.close()

consumer.py

 #!/usr/bin/env python
 # -*- coding: utf_ -*-
 # Date: 年月日
 # Author:蔚蓝行
 # 博客 http://www.cnblogs.com/duanv/
 import pika
 #建立连接connection到localhost
 con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 #创建虚拟连接channel
 cha = con.channel()
 #创建队列anheng
 result=cha.queue_declare(queue='anheng',durable=True)
 #创建名为yanfa,类型为fanout的交换机,其他类型还有direct和topic
 cha.exchange_declare(durable=False,
           exchange='yanfa', 
           type='direct',)
 #绑定exchange和queue,result.method.queue获取的是队列名称
 cha.queue_bind(exchange='yanfa',
        queue=result.method.queue,
        routing_key='',)
 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
 cha.basic_qos(prefetch_count=)
 print ' [*] Waiting for messages. To exit press CTRL+C'
 #定义回调函数
 def callback(ch, method, properties, body):
   print " [x] Received %r" % (body,)
   ch.basic_ack(delivery_tag = method.delivery_tag)
 cha.basic_consume(callback,
          queue='anheng',
          no_ack=False,)
 cha.start_consuming()

一、概念:

Connection: 3个TCP的连接。Producer和Consumer都是经过TCP连接到RabbitMQ
Server的。程序的开头处就是创设这一个TCP连接。

Channels:
虚拟连接。创立在上述的TCP连接中。数据流动都是在Channel中展开的。一般景观是先后开头建构TCP连接,第二步正是起家那些Channel。

二、队列:

先是成立叁个Connection,然后建构Channels,在channel上创立队列

确马上内定durable参数为真,队列将持久化;内定exclusive为真,队列为有时队列,关闭consumer后该队列将不再存在,一般意况下创制有的时候队列并不钦命队列名称,rabbitmq将随便起名,通过result.method.queue来获取队列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

差异:durable是队列长久化与否,即便为真,队列就要rabbitmq服务重启后仍存在,若是为假,rabbitmq服务重启前不会破灭,与consumer关闭与否毫无干系;

而exclusive是赤手空拳有时队列,当consumer关闭后,该队列就能被删除

三、exchange和bind

Exchange中durable参数钦赐exchange是还是不是悠久化,exchange参数钦点exchange名称,type钦赐exchange类型。Exchange类型有direct,fanout和topic。

Bind是将exchange与queue进行关联,exchange参数和queue参数分别钦点要拓展bind的exchange和queue,routing_key为可选参数。

Exchange的二种方式:

Direct:

任何发送到Direct Exchange的新闻都会被转接到routing_key中钦赐的Queue

1.貌似意况能够接纳rabbitMQ自带的Exchange:””(该Exchange的名为空字符串);

二.这种格局下无需将Exchange进行任何绑定(bind)操作;

三.新闻传递时要求二个“routing_key”,能够简简单单的领会为要发送到的行列名字;

4.1旦vhost中不设有routing_key中钦点的系列名,则该新闻会被舍弃。

德姆o中就算声称了3个exchange=’yanfa’和queue=’anheng’的bind,然而在后头发送音信时并从未行使该exchange和bind,而是利用了direct的形式,没有钦赐exchange,而是内定了routing_key的称号为队列名,音讯将发送到钦定队列。

假诺贰个exchange
表明为direct,并且bind中钦命了routing_key,那么发送音讯时索要同一时间指明该exchange和routing_key.

Fanout:

其它发送到Fanout
Exchange的信息都会被转化到与该Exchange绑定(Binding)的兼具Queue上

1.方可驾驭为路由表的情势

二.这种形式不需求routing_key

三.这种方式供给提前将Exchange与Queue进行绑定,多个Exchange能够绑定四个Queue,1个Queue可以同多少个Exchange进行绑定。

四.假使接受到音信的Exchange未有与别的Queue绑定,则音讯会被丢掉。

德姆o中成立了一个将一个exchange和3个queue实行fanout类型的bind.不过发送音讯时未尝选取它,就算要用到它,只要在出殡和埋葬消息时钦点该exchange的称号就可以,该exchange就能将音讯发送到全体和它bind的类别中。在fanout形式下,内定的routing_key是无效的

Topic:

其余发送到Topic
Exchange的音信都会被转接到持有关怀routing_key中钦点话题的Queue上

一.这种形式比较复杂,轻易的话,便是每一种队列都有其关怀的大旨,全数的音信都包括3个“标题”(routing_key),Exchange会将音信转载到持有关切核心能与routing_key模糊相配的行列。

二.这种情势须要routing_key,可能要提早绑定Exchange与Queue。

三.在开始展览绑定期,要提供一个该队列关注的大旨,如“#.log.#”表示该队列关心全数关乎log的新闻(二个routing_key为”MQ.log.error”的新闻会被转接到该队列)。

4.“#”表示0个或若干个相当重要字,“*”表示3个最首要字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述贰者合营。

伍.一仍其旧,假如Exchange未有意识能够与routing_key相配的Queue,则会丢掉此音信。

四、任务分发

一.Rabbitmq的天职是循环分发的,假如展开八个consumer,producer发送的新闻是轮岗发送到七个consume的。

2.在producer端使用cha.basic_publish()来发送音信,个中body参数就是要发送的音信,properties=pika.BasicProperties(delivery_mode
= 二,)启用音信长久化,可避防卫RabbitMQ Server
重启恐怕crash引起的多寡丢失。

三.在接收端使用cha.basic_consume()Infiniti循环监听,假设设置no-ack参数为真,每趟Consumer接到数据后,而不论是还是不是管理完了,RabbitMQ
Server会立即把那一个Message标识为成功,然后从queue中去除了。为了保险数据不被遗失,RabbitMQ协助信息确认机制,即acknowledgments。为了有限补助数据能被正确管理而不光是被Consumer收到,那么大家无法动用no-ack。而相应是在管理完数据后发送ack。

在管理多少后发送的ack,正是告诉RabbitMQ数据已经被吸收接纳,管理完结,RabbitMQ能够去安全的去除它了。假诺Consumer退出了可是未有发送ack,那么RabbitMQ就能把那么些Message发送到下2个Consumer。那样就保证了在Consumer相当退出的景况下数据也不会丢掉。

此处并未应用超时机制。RabbitMQ仅仅经过Consumer的接连中断来确认该Message并从未被精确管理。约等于说,RabbitMQ给了Consumer丰硕长的年华来做多少处理。

Demo的callback方法中ch.basic_ack(delivery_tag =
method.delivery_tag)告诉rabbitmq音信一度准确管理。假如未有那条代码,Consumer退出时,Message会重新分发。然后RabbitMQ会占用更加的多的内部存款和储蓄器,由于RabbitMQ会短时间运作,由此那一个“内部存款和储蓄器泄漏”是沉重的。去调解这种不当,能够经过须臾间发令打字与印刷un-acked
Messages:

sudo rabbitmqctl list_queues name messages_ready
messages_unacknowledged

4.公而忘私分发:设置cha.basic_qos(prefetch_count=一),这样RabbitMQ就能使得各样Consumer在同叁个光阴点最多管理3个Message。换句话说,在接到到该Consumer的ack前,他它不会将新的Message分发给它。

五、注意:

生产者和消费者都应有注解建设构造队列,网络教程上说第三回创立如果参数和第一次不均等,那么该操作纵然打响,可是queue的习性并不会被涂改。

大概因为版本难点,在自家的测试中就算第2遍评释创立的种类属性和率先次不一模二样,将报类似这种错40陆,
“PRECONDITION_FAILED – parameters for queue ‘anheng’ in vhost ‘/’ not
equivalent”

借使是exchange第壹回创造属性不一致,将报这种错40陆, “PRECONDITION_FAILED –
cannot redeclare exchange ‘yanfa’ in vhost ‘/’ with different type,
durable, internal or autodelete value”

要是第叁次表明创建队列也出现那个荒唐,表明在此之前存在名字如出1辙的体系且此番注脚的某个质量和事先宣称分化,可经过命令sudo
rabbitmqctl
list_queues查看当前有何队列。化解方法是声称建构另一称谓的连串或删除原有队列,要是原本队列是非长久化的,可因此重启rabbitmq服务删除原有队列,假诺原本队列是持久化的,只可以删除它所在的vhost,然后再重建vhost,再安装vhost的权杖(先认同该vhost中从不其余有用队列)。

sudo rabbitmqctl delete_vhost /
sudo rabbitmqctl add_vhost /
sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'

如上内容是作者给大家介绍的应用Python学习RabbitMQ消息队列,希望我们喜爱。

2. 一灰灰Blog: 

1灰灰的个人博客,记录全部学习和行事中的博文,应接大家前去逛逛

 

您只怕感兴趣的篇章:

  • php Memcache 中贯彻消息队列
  • PHP下操作Linux音信队列实现经过间通讯的秘诀
  • 进程间通讯之深深消息队列的详解
  • android开垦教程之使用looper管理音讯队列
  • python使用rabbitmq实现互连网爬虫示例
  • PHP+memcache完毕音信队列案例分享
  • Python复制文件操作实例详解
  • Python基于pygame实现的弹力球效果(附源码)

RabbitMQ能够当作3个音讯代理,它的中坚原理特别简单:即接到和发送消息,能够把它想象成2个邮局:笔者…

一. Exchange暗中同意场景

将前方的音信发送代码捞出来,干掉Exchange的声明,如下

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //创建连接
        Connection connection = factory.newConnection();

        //创建消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);

        // 发布消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

在昭示音讯时,传入的Exchange名称叫“”,再到调整台查看,开采数目被投递到了(AMQP
default)那些调换器,对应的截图如下

 

图片 1

image

看一下地点的绑定描述内容,敬服如下

  • 暗中认可沟通器采纳Direct战略
  • 将rountingKey绑定到同名的queue上
  • 不协助显得的绑定和平化解绑

上边的代码为了演示数据的流向,在发布音信的同一时间也定义了三个同名的Queue,由此得以在调节台上看看同名的
“hello” queue,且个中有20条数据

当大家去掉queue的宣示时,会发觉另3个主题素材,投入的多少好像并不曾存下来(因为从没queue来接收那一个数据,而从此再声称queue时,以前的数据也不会分配过来)

发布订阅格局存在有的标题:

I. 背景

前1篇基本接纳篇的博文中,介绍了rabbitmq的两种选择姿势,能够通晓怎么向RabbitMQ发送音信以及怎样消费,但遗留下多少个问号,本篇则第2意在弄通晓这几点

  • Exchange表明的难题(是还是不是必须申明,假使不注脚会如何)
  • Exchange注解的多少个参数(durable, autoDelete)有何差异
  • 当未有队列和Exchange绑按时,直接往队列中塞数据,好像不会有数据增添(即先塞数据,然后创设queue,创立绑定,从调整台上看那一个queue里面也不会有多少)
  • 音讯消费的三种姿势(2个积极向上去拿多少,七个是rabbit推数据)相比较

2、即使设置音信长久化为true,但队列设置成排他性队列,那么在RabbitMQ重启之后,音讯是不是仍然存在。请自行检索分析,后一次深入分析该难题。

II. 基本进级篇

设置了队列和消息漫长化后,当服务重启之后,音信依然存在。只设置队列长久化,不设置音信长久化,重启之后新闻会丢掉;只设置音信长久化,不设置队列长久化,在劳动重启后,队列会消失,从而依据于队列的新闻也会丢掉。只设置新闻长久化而不设置队列的长久化,毫无意义。

III. 其他

 1 MQ SDK新增接口:
 2 IMQSession新增方法:
 3 /// <summary>
 4         /// 创建消息消费者
 5         /// </summary>
 6         /// <param name="topicName">主题名称</param> 
 7         /// <param name="customTopicQueueName">自定义Topic关联队列名称</param>
 8         /// <param name="isPersistence">是否持久化</param>
 9         /// <returns>消息消费者</returns>
10         IMessageConsumer CreateTopicConsumer(string topicName, string customTopicQueueName, bool isPersistence = false);
11 调用方式:消费端需要明确指定需要消费的发布订阅关联队列。例如配置中心热部署,每个配置中心实例都需要指定唯一的关联队列名。
12 这样就可以和正常的MAC队列消费一样,消费指定队列消息。
13 
14 实现方式,四个步骤:
15 1.创建持久化Topic(即持久化Exchange):
16   var service = MQServiceProvider.GetDefaultMQService();
17             var messageText = "abc";
18             ///创建Topic
19             using (var connection = service.CreateConnection())
20             {
21                 var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
22                 var messageCreator = service.GetMessageCreator();
23                 var message = messageCreator.CreateMessage(messageText);
24                 message.IsPersistent = true;
25                 var producer = session.CreateProducer();
26                 var topic = session.DeclareTopic(topicName, true);
27             }
28 2.定义消费者Consumer:
29 List<string> queueList = new List<string>() {
30                 "guozhiqi1",
31                 "guozhiqi2",
32                 "guozhiqi3",
33                 "guozhiqi4",
34                 "guozhiqi5",
35                 "guozhiqi6",
36                 "guozhiqi7",
37                 "guozhiqi8",
38                 "guozhiqi9",
39             };
40             //var service = MQServiceProvider.GetDefaultMQService();
41             //var messageText = "abc" + DateTime.Now.ToShortTimeString();
42             //定义消费者
43             using (var connection1 = service.CreateConnection())
44             {
45                 var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
46                 foreach (var item in queueList)
47                 {
48                     session1.DeclareQueue(item, true);
49                     var consumer = session1.CreateTopicConsumer(topicName, item, true);
50                 }
51             }
52 3.发送消息到Topic
53  //发送消息
54             for (int i = 0; i <= 100; i++)
55             {
56                 using (var connection = service.CreateConnection())
57                 {
58                     var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
59                     var messageCreator = service.GetMessageCreator();
60                     var message = messageCreator.CreateMessage(messageText);
61                     message.IsPersistent = true;//设置持久化
62                    message.TimeToLive = TimeSpan.FromSeconds(30);//设置过期时间
63                     var producer = session.CreateProducer();
64                     var topic = session.DeclareTopic(topicName, true);
65                     producer.Send(message, topic);
66                 }
67             }
68 4.从队列接收消息
69 Parallel.ForEach(queueList, (item) =>
70             {
71                 while (true)
72                 {
73                     //接收消息
74                     using (var connection1 = service.CreateConnection())
75                     {
76                         var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
77 
78                         session1.DeclareQueue(item, true);
79                         var consumer = session1.CreateTopicConsumer(topicName, item, true);
80                         var topic = session1.DeclareTopic(topicName, true);
81                         var receivedmessage = consumer.Receive(topic);
82                         var textMessage = receivedmessage as ITextMessage;
83 
84                         Assert.AreEqual(messageText, textMessage.Body);
85                         consumer.Acknowledge(receivedmessage);
86                     }
87                 }
88 
89             });

RabbitMQ基础教程之使用进级篇

相关博文,推荐查看:

  1. RabbitMq基础教程之安装与测试
  2. RabbitMq基础教程之基本概念
  3. RabbitMQ基础教程之焦点选拔篇

叁、优先级机制

c. 小结

  • 当多个Queue已经宣示好了之后,不可能更新durable大概autoDelted值;当须求修改时,供给先删除再重复表明
  • 消费的Queue表明应该和投递的Queue声明的
    durable,autoDelted属性1致,不然会报错
  • 对此重大的数目,一般安装 durable=true, autoDeleted=false
  • 对此设置 autoDeleted=true 的体系,当未有消费者未来,队列会自动被删去

Message长久化:发送端将音讯发送至Exchange,Exchange将音信转载至关联的Queue,音讯存款和储蓄于实际的Queue中。纵然RabbitMQ重启之后,由于Message未设置长久化,那么音信会在重启之后丢失。

1. 参考

Java Client API Guide

2、音讯的确认机制。

3. Durable, autoDeleted参数

在定义Queue时,能够钦定那七个参数,这五个参数的差异是怎样吧?

Exchange
长久化:若是不设定Exchange悠久化,那么在RabbitMQ由于有个别相当等原因重启之后,Exchange会丢失。Exchange丢失,
会影响发送端发送消息到RabbitMQ。

3. 声明

尽信书则比不上,已上内容,纯属一家之辞,因个体工夫有限,难免有遗漏和不当之处,如发掘bug可能有更加好的建议,招待研究指正,不吝感谢

  • 腾讯网地址: 小灰灰Blog
  • QQ: 一灰灰/3302797840

固然漫长化会促成品质损耗,但为了生产蒙受的多寡1致性,那是大家务必做出的取舍。但我们能够由此设置音信过期时间、下跌发送音讯大小等其他措施来尽量的减退MQ质量的减退。

a. durable

持久化,保险RabbitMQ在剥离或许crash等至极景况下数据未有丢失,需求将queue,exchange和Message都漫长化。

假定将queue的长久化标志durable设置为true,则意味是二个持久的行列,那么在服务重启之后,也会设有,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新什么从前被悠久化的queue。队列是足以被持久化,不过中间的新闻是还是不是为长久化那还要看新闻的长久化设置。也正是说,重启从前特别queue里面还一直不发出去的音讯的话,重启之后那队列之中是否还留存原来的音信,这几个就要取决于产生着在发送消息时对信息的安装

上期热门难题:

b. autoDeleted

自动删除,就算该队列未有任何订阅的主顾来讲,该队列会被活动删除。这种队列适用于一时队列

以此相比较便于演示了,当二个Queue被安装为活动删除时,当消费者断掉之后,queue会被去除,那些第三针对的是局地不是专程重大的数量,不希望出现音讯积存的情况

// 倒数第二个参数,true表示开启自动删除
// 正数第二个参数,true表示持久化
channel.queueDeclare(queue, true, false, true, null);
/// <summary>(Spec method) Declare a queue.</summary>
        [AmqpMethodDoNotImplement(null)]
        QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive,
            bool autoDelete, IDictionary<string, object> arguments);  

4. 围观关怀

 

图片 2

QrCode

 

2. 绑定之后才有数据

先是是将调整台北的hello那一个queue删掉,然后重新实践上边包车型大巴代码(相对于前方的就是注释了queue的证明)

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //创建连接
        Connection connection = factory.newConnection();

        //创建消息通道
        Channel channel = connection.createChannel();
        //        channel.queueDeclare(queue, true, false, true, null);

        // 发布消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

接下来从调整台上看,能够看来有数据写入Exchange,不过尚未queue来接收这个多少

 

图片 3

IMAGE

接下来张开消费进程,然后重新实行上边包车型大巴塞入数据,新前边重新塞入的数目能够被消费;不过在此以前塞入的数额则尚未,消费音信的代码如下:

public class MyDefaultConsumer {
    public void consumerMsg(String queue) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //创建连接
        Connection connection = factory.newConnection();

        //创建消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    System.out.println(" [ " + queue + " ] Received '" + message);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 取消自动ack
        channel.basicConsume(queue, false, consumer);
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        MyDefaultConsumer consumer = new MyDefaultConsumer();
        consumer.consumerMsg("hello");

        Thread.sleep(1000 * 60 * 10);
    }
}

小结:

  • 由此上边的示范得知一点
  • 当未有Queue绑定到Exchange时,往Exchange中写入的音讯也不会再度分发到未来绑定的queue上

Exchange持久化:

 BasicPublish 的定义:

参数表明:exchange:RabbitMQ中定义的Exchange名称,type:类型,包蕴fanout、topic、direct、headers,durable:悠久化设置。设置成true,就足以设定exchange长久化存储,autodelete:是或不是自动删除。

发表评论

电子邮件地址不会被公开。 必填项已用*标注