当前位置: 首页 > news >正文

广告设计工作内容谷歌seo运营

广告设计工作内容,谷歌seo运营,重庆免费注册推广网站,品牌建设规划品牌意向目录 RabbitMQ认识概念使用场景优点AMQP协议JMS RabbitMQ安装安装elang安装RabbitMQ安装管理插件登录RabbitMQ消息队列的工作流程 RabbitMQ常用模型HelloWorld-基本消息模型生产者发送消息导包获取链接工具类消息的生产者 消费者消费消息模拟消费者手动签收消息 Work QueuesSen…

目录

      • RabbitMQ认识
        • 概念
        • 使用场景
        • 优点
        • AMQP协议
        • JMS
      • RabbitMQ安装
        • 安装elang
        • 安装RabbitMQ
        • 安装管理插件
        • 登录RabbitMQ
        • 消息队列的工作流程
      • RabbitMQ常用模型
        • HelloWorld-基本消息模型
          • 生产者发送消息
            • 导包
            • 获取链接工具类
            • 消息的生产者
          • 消费者消费消息
            • 模拟消费者
            • 手动签收消息
        • Work Queues
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-FANOUT-广播
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Direct-定向
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Topic-通配符
          • Sender
          • Consume1
          • Consume2
        • 总结
      • SpringBoot集成RabbitMQ
        • 导包
        • yml
        • config
        • producer
        • consumer

RabbitMQ认识

概念

MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官方地址:http://www.rabbitmq.com/

使用场景

在这里插入图片描述

优点

任务异步处理:
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(丢进去由接收方分别异步处理)

消除峰值:
异步化提速(发消息),提高系统稳定性(多系统调用),服务解耦(5-10个服务),排序保证,消除峰值
(放入队列中不用马上都处理完,有中间状态,消息分发后可由多个订阅方分别异步处理)

服务解耦:
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(将单体业务拆分为生产者,消息队列和消费者)

AMQP协议

AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
(其他Python,C#,PHP也都能用)

JMS

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的。
(只能Java用,基本已经被摒弃)

RabbitMQ安装

安装elang

otp_win64_20.2.exe
配置环境变量

安装RabbitMQ

rabbitmq-server-3.7.4.exe
可通过任务管理器或开始菜单启动或关闭服务

安装管理插件

安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新启动RabbitMQ
(开启可视化界面)

重启MQ

登录RabbitMQ

进入浏览器,输入:http://localhost:15672,初始账号和密码:guest/guest
在这里插入图片描述

消息队列的工作流程

在这里插入图片描述

RabbitMQ常用模型

HelloWorld-基本消息模型

一个生产者与一个消费者
在这里插入图片描述

生产者发送消息
导包
<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><!--和springboot2.0.5对应--><version>5.4.1</version></dependency>
</dependencies>
获取链接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");//集群的时候才用这个参数factory.setUsername("guest");factory.setPassword("guest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}
消息的生产者
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//消息的生产者
public class Sender {public static  final  String  HELLO_QUEUE="hello_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列(hello这里用默认的交换机)/*  String queue :队列的名字,可自定义,   boolean durable: 持久化,   boolean exclusive:是否独占;大家都能用,传false,boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,Map<String, Object> arguments:没有其他要传的属性就传false          */channel.queueDeclare(HELLO_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", HELLO_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}

在这里插入图片描述

消费者消费消息
模拟消费者
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");}};//3.监听队列/*queue :队列名字autoAck:自动签收Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}

在这里插入图片描述
在这里插入图片描述
只要消费者不关,生产者发一次消息消费者就自动监听消费一次消息

手动签收消息
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);// 消费者标识System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
//                System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("消费完成----------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个,传false}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收不等于消费成功,处理逻辑走完没有报错才算签收成功Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}

Work Queues

在这里插入图片描述
一个生产者与多个消费者。
默认轮询,也可以改成能者多劳

Sender
//消息的生产者
/*如果有多个消费者监听同一个队列,默认轮询*/
public class Sender {public static  final  String  WORK_QUEUE="work_queue";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建队列/*String queue :队列的名字boolean durable: 持久化boolean exclusive:是否独占boolean autoDelete: 用完即删Map<String, Object> arguments*/channel.queueDeclare(WORK_QUEUE, true, false, false, null);String msg="今天中午吃啥";//4.发送消息channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
public class Consume1 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
//        channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}
Consume2
//模拟消费者
public class Consume2 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量
//        channel.basicQos(1);//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(10000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}

订阅模型-FANOUT-广播

在这里插入图片描述
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

Sender
//消息的生产者
/*变化1.不创建 队列2.创建交换机3.给交换机发送消息*/
public class Sender {public static  final  String  FANOUT_EXCHANGE="fanout_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume1 {public  static final  String FANOUT_QUEUE1="fanout_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume2 {public  static final  String FANOUT_QUEUE2="fanout_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, "");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE2,false,callback);}
}

订阅模型-Direct-定向

在这里插入图片描述
把消息交给符合指定routing key 的队列 一堆或一个

Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static  final  String  DIRECT_EXCHANGE="direct_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(DIRECT_EXCHANGE, "dept", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public  static final  String DIRECT_QUEUE1="direct_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, "emp.delete");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public  static final  String DIRECT_QUEUE2="direct_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, "dept");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE2,false,callback);}
}

订阅模型-Topic-通配符

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static  final  String  TOPIC_EXCHANGE="topic_exchange";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//3.创建交换机/*exchange:交换机的名字type:交换机的类型durable:是否持久化*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);String msg="今天晚上吃啥";//4.发送消息channel.basicPublish(TOPIC_EXCHANGE, "user.insert.add.pubilsh", null, msg.getBytes());channel.close();conn.close();}
}
Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public  static final  String TOPIC_QUEUE1="topic_queue1";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, "user.#");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE1,false,callback);}
}
Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public  static final  String TOPIC_QUEUE2="topic_queue2";public static void main(String[] args) throws Exception {//1.获取连接Connection conn = ConnectionUtil.getConnection();//2.获取通道Channel channel = conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, "email.*");//回调Consumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope);//System.out.println(1/0);System.out.println("消息内容:"+new String(body));System.out.println("------------------------------------");//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck:自动签收    签收 不等于  消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE2,false,callback);}
}

总结

01_hello生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)02_workqueue	默认轮询		可修改(能者多劳)生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)03_fanout	广播	将消息交给所有绑定到交换机的队列(多个消费者都能收到)生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)04_direct	定向	把消息交给符合指定 routing key 的队列 一堆或一个生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)05_topic		通配符	把消息交给符合routing pattern (路由模式) 的队列 一堆或一个生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

SpringBoot集成RabbitMQ

导包

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--spirngboot集成rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>

yml

server:port: 44000
spring:application:name: test‐rabbitmq‐producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /listener:simple:acknowledge-mode: manual #手动签收prefetch: 1 #消费者的消息并发处理数量#publisher-confirms: true #消息发送到交换机失败回调#publisher-returns: true  #消息发送到队列失败回调template:mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

config

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_SPRINGBOOT="exchange_springboot";public static final String QUEUE1_SPRINGBOOT="queue1_springboot";public static final String QUEUE2_SPRINGBOOT="queue2_springboot";//创建一个交换机@Beanpublic Exchange createExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();}//创建两个队列@Beanpublic Queue createQueue1(){return  new Queue(QUEUE1_SPRINGBOOT,true);}@Beanpublic Queue createQueue2(){return  new Queue(QUEUE2_SPRINGBOOT,true);}//把交换机和队列绑定到一起@Beanpublic Binding bind1(){return BindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();}@Beanpublic Binding bind2(){return BindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();}//消费者 还原对象方式(从MQ里取出json转为对象)@Bean("rabbitListenerContainerFactory")public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setPrefetchCount(1);return factory;}//放到消息队列里面的转换(转为json存进MQ)@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}

producer

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test(){/*问题:多系统之间 信息交互  传递对象解决方案:转换为json存储实现:1.fastjson    对象 - josn  (作业)2.重写转换器模式*/for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT, "email.save", new User(1L,"文达"));}System.out.println("消息发送完毕");}
}

consumer

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;//消费者
@Component
public class Consu {@RabbitListener(queues = {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory = "rabbitListenerContainerFactory")//用这个转换器接public void user(@Payload User user, Channel channel, Message message) throws IOException {System.out.println(message);System.out.println("user队列:"+user);//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = {RabbitMQConfig.QUEUE2_SPRINGBOOT})public void email(@Payload User user,Channel channel,Message message ) throws IOException {System.out.println(message);System.out.println("email队列:"+user);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

队列内容可传string,entity序列化对象,json对象,

http://www.wooajung.com/news/22554.html

相关文章:

  • 网站工程师简历北京软件开发公司
  • 西安米德建站网店运营
  • 有什么网站可以下做闭软件网站综合排名信息查询
  • 做网站需要nba表格长春网站建设
  • 网站 建设 函企拓客软件多少钱
  • spring mvc 网站开发百度之家
  • 做茶道网站磁力猫torrent kitty
  • wordpress 吧株洲百度seo
  • 做调查问卷赚钱网站有哪些关键词林俊杰在线听免费
  • 石家庄网站建站全网营销外包
  • 英文域名在哪个网站查询站长统计工具
  • 自己怎样做网站沈阳seo技术
  • 广东三网合一网站建设报价网页推广怎么做
  • 网站开发研究方法网络项目资源网
  • 请私人做网站风险杭州seo平台
  • 公司网站建设合同模板下载武汉seo结算
  • 婚庆网站建设论文哪里注册域名最便宜
  • 做化妆品网站的原因免费网络推广网址
  • 上传到网站的根目录中怎么联系百度客服
  • 手机上怎么分享wordpress沈阳seo优化新势力
  • 中山手机网站建设电话互联网广告行业
  • 英文网站怎么做seoseo快速排名源码
  • 柳城网站开发小视频网站哪个可以推广
  • 校园网站建设调查问卷网络营销方案例文
  • 如何制作私人网站微信小程序开发
  • 专门做av字幕的网站苏州网站建设公司
  • 上海建设厅网站首页浙江疫情最新消息
  • 简单的手机网页制作绍兴seo推广
  • 做h5的网站有哪些百度投流运营
  • 提供网站建设报价2022最近十大的新闻热点