RabbitMQ 的使用
# RabbitMQ 的使用
RabbitMQ
是使用 Erlang
编写的开源消息队列,支持多种协议,非常适合企业及开发。
# 安装
RabbitMQ
使用 Erlang
编写,所以在安装之前需要首先安装 Erlang
环境。
全部下载最新稳定版即可。
下载并安装之后,需要配置管理界面。通过命令直接启用即可:
rabbitmq-plugins ebable rabbitmq_management
如果没有配置环境变量,可以进入安装目录下的 /sbin
文件夹中执行,如图:
建议
建议将安装路径下的 /sbin
路径添加到环境变量中,以便后续使用命令。
提示
RabbitMQ
默认监听端口为 5672
。而管理后台的端口则是 15672
,所以启用管理之后,可以通过浏览器访问:http://localhost:15672/
# 配置
# 添加用户
通过命令可以快速管理用户:
# 查看用户列表
rabbitmqctl list_users
# 添加用户
# rabbitmqctl add_user <username> <password>
rabbitmqctl add_user admin qaz123
# 设置权限
# rabbitmqctl set_permissions [-p <vhost>] <username> <configuration> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 设置用户标签
rabbitmqctl set_user_tags admin administrator
# 通常删除默认账户
rabbitmqctl delete_user guest
# 管理账户
# 1、修改密码
rabbitmqctl change_password <username> <newpassword>
# 2、清除密码(无法使用密码登录)
rabbitmqctl clear_password <username>
# 3、清除用户权限(拒绝指定用户访问指定主机)
rabbitmqctl clear_permissions [-p <vhost>] <username>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
提示
更多命令行内容,可以参考官方文档。
经过上面的命令,可以在管理界面看到发生了变化。当然,如果你已经通过 guest
登录且已经在命令行中将其删除,界面会提示你登录失败并尝试让你重新登录。
# 使用
针对不同的使用方式,官方文档 给出了不错的示例。下面介绍基本的使用方式。
示例代码
具体代码可以看 示例代码
# 创建一个生产者
创建一个控制台项目,并安装 RabbitMQ.Client
包。在主函数中直接创建即可:
// 创建连接
using var connection = RabbitMQFactory.Create();
// 创建通道
using var channel = connection.CreateModel();
// 1、声明交换机
channel.ExchangeDeclare("", ExchangeType.Direct, true, false, null);
// 2、声明队列
channel.QueueDeclare("queue", false, false, false, null);
// 3、绑定
channel.QueueBind("queue", "", "queue");
// 循环发送50个消息到 RabbitMQ
const string message = "Producer RabbitMQ Message";
for (var i = 0; i < 50; i++)
{
var body = Encoding.UTF8.GetBytes($"{message}-{i}");
// 发送消息到队列
channel.BasicPublish("", "queue", null, body);
Console.WriteLine($"发布消息 {message}-{i} 到队列。");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
如果你已经开启了 RabbitMQ
,此时运行该生产者之后,消息会保存到消息队列中:
# 创建一个消费者
新创建一个控制台项目,同样安装 RabbitMQ.Client
包。在主函数中直接创建即可:
// 创建连接
using var connection = RabbitMQFactory.Create();
// 创建通道
using var channel = connection.CreateModel();
// 声明队列
channel.QueueDeclare("queue", false, false, false, null);
// 创建基于事件的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"收到消息:{message}");
};
channel.BasicConsume("queue", true, consumer);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
运行之后,可以消费所有队列中的数据:
# 消息持久化
上面的内容,一旦 RabbitMQ
失效了,数据就会丢失。如何做到数据持久化呢,需要如下配置:
using var connection = RabbitMQFactory.Create();
using var channel = connection.CreateModel();
channel.QueueDeclare("queue", false, false, false, null);
// 消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 循环发送50个消息到 RabbitMQ
const string message = "Producer RabbitMQ Message";
for (var i = 0; i < 50; i++)
{
var body = Encoding.UTF8.GetBytes($"{message}-{i}");
// 发送消息到队列。将上面创建的属性添加进来
channel.BasicPublish("", "queue", properties, body);
Console.WriteLine($"发布消息 {message}-{i} 到队列。");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 继续
上面是一个最基本的使用方式。在 官方指导教程 中,包含了包括上面方式的共 7 中模式,其中前 5 中是比较常用的,尤其是第 3、4、5,分别是:
fanout 发布/订阅模式
即:生产者发送一个消息无差别地到所有消息队列,所有消费者都可以通过消息队列获取该消息。
direct 精准推送模式
即:生产者指定队列的
KEY
值,精确推送到指定的队列中,所有可以读取该队列的消费者都可以获取该消息。topic 主题模式
即:生产者可以根据一定规则指定队列的
KEY
值,模糊匹配队列并推送消息到队列中,所有可以读取该队列的消费者都可以获取该消息。
将基本代码贴出来,具体代码可以参照我的 文档配套代码库
# 生产者
// 发布订阅模式
public static void Fanout()
{
const string exchangeName = "fanout_exchange";
const string queueName1 = "fanout_queue1";
const string queueName2 = "fanout_queue2";
const string queueName3 = "fanout_queue3";
using var connection = RabbitMQFactory.Create();
using var channel = connection.CreateModel();
// 声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
// 声明3个队列
channel.QueueDeclare(queueName1, false, false, false, null);
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);
// 将队列绑定到交换机。不写 routingKey,意味着消息将发送到所有队列
channel.QueueBind(queueName1, exchangeName, "");
channel.QueueBind(queueName2, exchangeName, "");
channel.QueueBind(queueName3, exchangeName, "");
// 循环发送100个消息到 RabbitMQ
const string message = "Producer RabbitMQ Message";
for (var i = 1; i <= 20; i++)
{
var body = Encoding.UTF8.GetBytes($"{message}-{i}");
// 发送消息到交换机
channel.BasicPublish(exchangeName, "", null, body);
Console.WriteLine($"发布消息 {message}-{i} 到队列。");
}
}
// 精准推送模式
public static void Direct()
{
const string exchangeName = "direct_exchange";
const string queueName1 = "direct_queue1";
const string queueName2 = "direct_queue2";
const string queueName3 = "direct_queue3";
using var connection = RabbitMQFactory.Create();
using var channel = connection.CreateModel();
// 声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
// 声明3个队列
channel.QueueDeclare(queueName1, false, false, false, null);
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);
// 将队列绑定到交换机。为了达到直发效果,需要填写 routingKey,会按照该 Key 值匹配发送
channel.QueueBind(queueName1, exchangeName, "c");
channel.QueueBind(queueName2, exchangeName, "cpp");
channel.QueueBind(queueName3, exchangeName, "csharp");
// 循环发送100个消息到 RabbitMQ
const string message = "Producer RabbitMQ Message";
for (var i = 1; i <= 20; i++)
{
var body = Encoding.UTF8.GetBytes($"{message}-{i}");
var routingKeys = new[] {"c", "cpp", "csharp"};
// 发送消息到交换机
channel.BasicPublish(exchangeName, routingKeys[i % 3], null, body);
Console.WriteLine($"发布消息 {message}-{i} 到队列。");
}
}
// 主题模式
public static void Topic()
{
const string exchangeName = "topic_exchange";
const string queueName1 = "topic_queue1";
const string queueName2 = "topic_queue2";
const string queueName3 = "topic_queue3";
using var connection = RabbitMQFactory.Create();
using var channel = connection.CreateModel();
// 声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
// 声明3个队列
channel.QueueDeclare(queueName1, false, false, false, null);
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);
// 将队列绑定到交换机。Topic 模式下,绑定时可以填写模糊符号 "*" / "#"
channel.QueueBind(queueName1, exchangeName, "data.*");
channel.QueueBind(queueName2, exchangeName, "data.#");
channel.QueueBind(queueName3, exchangeName, "data.update");
// 循环发送100个消息到 RabbitMQ
const string message = "Producer RabbitMQ Message";
foreach (var key in new [] {"data.update", "data.insert", "data.insert.one"})
{
for (var i = 1; i <= 10; i++)
{
var body = Encoding.UTF8.GetBytes($"{message}-{i}");
// 发送消息到交换机
channel.BasicPublish(exchangeName, key, null, body);
Console.WriteLine($"发布消息 {message}-{i} 到队列,Key 为 {key}。");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# 消费者
// 发布订阅模式
public static void Fanout()
{
// 创建连接
using var connection = RabbitMQFactory.Create();
// 创建通道
using var channel = connection.CreateModel();
const string exchangeName = "fanout_exchange";
const string queueName2 = "fanout_queue2";
// 声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
// 声明队列
channel.QueueDeclare(queueName2, false, false, false, null);
// 将队列绑定到交换机
channel.QueueBind(queueName2, exchangeName, "");
// 创建基于事件的消费者
var consumer = new EventingBasicConsumer(channel);
// 设置 prefetchCount
channel.BasicQos(0, 1, false);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"收到消息:{message}");
};
// 消费第二个队列中的消息
channel.BasicConsume(queueName2, true, consumer);
}
// 精准推送模式
public static void Direct()
{
// 创建连接
using var connection = RabbitMQFactory.Create();
// 创建通道
using var channel = connection.CreateModel();
const string exchangeName = "direct_exchange";
const string queueName1 = "direct_queue1";
// 声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
// 声明队列
channel.QueueDeclare(queueName1, false, false, false, null);
// 将队列绑定到交换机
channel.QueueBind(queueName1, exchangeName, "c");
// 创建基于事件的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"收到消息:{message},key:{ea.RoutingKey}");
};
// 消费第一个队列中的消息
channel.BasicConsume(queueName1, true, consumer);
}
// 主题模式
public static void Topic()
{
// 创建连接
using var connection = RabbitMQFactory.Create();
// 创建通道
using var channel = connection.CreateModel();
const string exchangeName = "topic_exchange";
const string queueName1 = "topic_queue1";
// 声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
// 声明队列
channel.QueueDeclare(queueName1, false, false, false, null);
// 将队列绑定到交换机
channel.QueueBind(queueName1, exchangeName, "data.delete");
// 创建基于事件的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"收到消息:{message},key:{ea.RoutingKey}");
};
// 消费第一个队列中的消息
channel.BasicConsume(queueName1, true, consumer);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98