RabbitMQ-Direct模式


简介

 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息中间件,基于Erlang语言编写。

 

P:(producling)生产者,生产只意味着发送消息。

 

Q: (queue_name)队列,队列是位于rabbitmq中的post box的名称

 

C: (Consuming)消费者,消费者主要是等待接收消息的程序

 

 

开发准备

  •  netCoreTset.core:该工程主要封装了RabbitMQ的公用方法
  • RabbitMQClient    :该工程为生产者
  • RabbitMQServer  :该工程为消费者

 

1.创建netCoreTset.core类库项目

 

1.1 安装项目依赖

 

2.定义接口

using netCoreTest.core.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
    public interface IConnectionServer
    {
      
        /// <summary>
        /// 连接服务
        /// </summary>
        void Connection();
        /// <summary>
        /// 创建消息队列
        /// </summary>
        /// <param name="queName">队列名称</param>
        void CreateQueueDir();
        /// <summary>
        /// 关闭连接
        /// </summary>
        void CloseConnection();
        /// <summary>
        /// 关闭通道
        /// </summary>
        void CloseChannel();


    }
}
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
    public interface IMessageService
    {
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="msg">消息内容</param>
        void SendMsg(string msg);
        /// <summary>
        /// 获取消息
        /// </summary>
        /// <returns></returns>
        string GetMsg();
    }
}
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
   public interface IRabbitMqService:IMessageService,IConnectionServer
    {
    }
}

 

 3.编写RabbitMQ辅助类

using netCoreTest.core.Iserver;
using netCoreTest.core.Model;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core
{
    public class RabbitMQModel : IRabbitMqService
    {

        private readonly ConnectionFactory factory = null;
        private IModel channel;
        private IConnection connetction;
        readonly string exchangeName;//交换机名称
        readonly string routeKey;//路由名称
        readonly string queueName;///队列名称
        public RabbitMQModel(HostModel model)
        {
            /// <summary>
            /// 创建连接工厂
            /// </summary>
            factory = new ConnectionFactory
            {
                UserName = model.UserName,
                Password = model.PassWord,
                HostName = "localhost",
                Port = model.Port,
            };
            exchangeName = model.ExChangeModel.ExChangeName;
            routeKey = model.ExChangeModel.RouteKey;
            queueName = model.ExChangeModel.QueueName;
        }
        /// <summary>
        /// 创建连接
        /// </summary>
        public void Connection()
        {
            try
            {
                //创建连接
                connetction = factory.CreateConnection();
                //创建信道
                channel = connetction.CreateModel();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        public void CreateQueueDir()
        {
            //定义一个direct类型的交换机
            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
            //定义一个队列
            channel.QueueDeclare(queueName, false, false, false, null);
            //将队列绑定交换机
            channel.QueueBind(queueName, exchangeName, routeKey, null);
        }public void SendMsg(string msg)
        {
            var sendBytes = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
        }

        public void CloseChannel()
        {
            channel.Close();
        }

        public void CloseConnection()
        {
            connetction.Close();
        }

        public string GetMsg()
        {
            //事件基本消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            string msg = null;
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                msg = message;
                Console.WriteLine($"收到消息: {message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine("消费者已启动");
            Console.ReadKey();
            CloseConnection();
            CloseChannel();
            return msg;
        }


    }
}

4.创建direct模式发送类

using netCoreTest.core.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.ExchangeTypeModel
{

    /// <summary>
    /// Direct模式发送
    /// </summary>
    public class DirectPost
    {


        RabbitMQModel rabbitMQModel;

        public DirectPost()
        {
            HostModel hostModel = new HostModel();
            hostModel.UserName = "admin";
            hostModel.PassWord = "admin";
            hostModel.Host = "127.0.0.1";
            hostModel.Port = 5672;
            hostModel.ExChangeModel =new ExChangeModel {
                ExChangeName = "ClentName",
                QueueName = "Clent",
                RouteKey = "ClentRoute"
            };
            rabbitMQModel = new RabbitMQModel(hostModel);
            rabbitMQModel.Connection();

        }
        public void CreateQueue()
        {
            rabbitMQModel.CreateQueueDir();
        }
        public void SendMsg(string msg)
        {
            rabbitMQModel.SendMsg(msg);
        }
        public void GetMsg()
        {
            rabbitMQModel.GetMsg();
        }
    }
}

5.创建RabbitMQClient控制台应用程序

 

 

using netCoreTest.core;
using netCoreTest.core.ExchangeTypeModel;
using netCoreTest.core.Model;
using RabbitMQ.Client;
using System;

namespace RabbitMQClient
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("消息生产者开始生产数据!");
            Console.WriteLine("输入exit退出!");
            DirectPost directPost = new DirectPost();
            directPost.CreateQueue();
            string input;
           
            do
            {
                input = Console.ReadLine();
                directPost.SendMsg(input);

            } while (input.Trim().ToLower() != "exit");


        }
    }
}

6.创建RabbitMQService控制台应用程序

using netCoreTest.core;
using netCoreTest.core.ExchangeTypeModel;
using netCoreTest.core.Model;
using System;
using System.Text;

namespace RabbitMQServer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");

            DirectPost directPost = new DirectPost();
            directPost.GetMsg();
        

        }
    }
}

7.执行RabbitMQclient和RabbitMQserver

 


作者:zyz1,发布于:2019/05/16
原文:https://www.cnblogs.com/zhengyazhao/p/10869982.html