在未来商业智能中的作用
|
最终控制台结果, 确实实现了延时队列的功能: 2020-01-12 11:14:17.582 INFO 12032 — [main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—– 2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-2] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体 2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-1] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体 除了队列 TTL 以外,粒度为消息级别的 TTL 也是可以设置的。 SpringAMQP 对单条消息的 TTL 设置,需要在 MessageProperties 类中进行,每个消息都会内置一个此类。
为了方便,SpringAMQP 在消息发送流程中提供了一个钩子可以让我们设置 Message 的属性,那就是 MessagePostProcessor 以看到我定义了关于 普通队列相关 以及 死信队列相关 的几个常量。 并且基于这些常量实例化出了对应的交换机、队列,并设置了绑定关系。 在实例化普通队列时对其进行了特殊处理; 给普通队列绑定上了死信交换机,并指定好死信 routing key。指定好了其 TTL 值 (5s 过期) 后才进行实例化。 那么现在以这么一个配置,就已经实现了延时消息需要的所有条件了。 写个消费者、发送者来测试一下。
消费者: Kafka 做的事情就是,把这个数据搬运的次数,从上面的四次,变成了两次,并且只有 DMA 来进行数据搬运,而不需要 CPU fileChannel.transferTo(position, count, socketChannel);
Kafka 的代码调用了 Java NIO 库,具体是 FileChannel 里面的 transferTo 方法。我们的数据并没有读到中间的应用内存里面,而是直接通过 Channel,写入到对应的网络设备里。并且,对于 Socket 的操作,也不是写入到 Socket 的 Buffer 里面,而是直接根据描述符(Descriptor)写入到网卡的缓冲区里面。于是,在这个过程之中,我们只进行了两次数据传输。 但是,无论 I/O 速度如何提升,比起 CPU,总还是太慢。SSD 硬盘的 IOPS 可以到 2 万、4 万,但是我们 CPU 的主频有 2GHz 以上,也就意味着每秒会有 20 亿次的操作。如果我们对于 I/O 的操作,都是由 CPU 发出对应的指令,然后等待 I/O 设备完成操作之后返回,那 CPU 有大量的时间其实都是在等待 I/O 设备完成操作。但是,这个 CPU 的等待,在很多时候,其实并没有太多的实际意义。我们对于 I/O 设备的大量操作,其实都只是把内存里面的数据,传输到 I/O 设备而已。在这种情况下,其实 CPU 只是在傻等而已。特别是当传输的数据量比较大的时候,比如进行大文件复制,如果所有数据都要经过 CPU,实在是有点儿太浪费时间了。因此,计算机工程师们,就发明了 DMA 技术, 也就是直接内存访问(Direct Memory Access)技术,来减少 CPU 等待的时间
Kafka 是一个用来处理实时数据的管道,我们常常用它来做一个消息队列,或者用来收集和落地海量的日志。作为一个处理实时数据和日志的管道,瓶颈自然也在 I/O 层面。Kafka 里面会有两种常见的海量数据传输的情况。一种是从网络中接收上游的数据,然后需要落地到本地的磁盘上,确保数据不丢失。另一种情况呢,则是从本地磁盘上读取出来,通过网络发送出去。 (编辑:揭阳站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |

