消息邮箱和消息队列,消息队列发送邮箱

hacker1年前黑客服务99

为什么我在linux下给自己的root用户发送邮件的时候连接到别人的邮箱了?

对方的postfix服务器没有正常启动,或许网卡或者别的原因,比如你加过密,密码验证错误

Linux程序设计:关于消息队列 题目:编写两程序,程序1从消息队列接收消息,程序2则发送消息

我的作业,你凑合着用吧

//msgq_send.c

#include stdio.h

#include string.h

#include sys/msg.h

#define MAXSIZE 256

int main(int argc, char *argv[])

{

if (argc 2)

{

printf("Error args\n");

return -1;

}

int msgid;

msgid = msgget((key_t)2000, IPC_CREAT | 0644);

if (msgid == -1)

{

printf("msgget error\n");

return -1;

}

if (msgsnd(msgid, (void *)argv[1], MAXSIZE, 0) == -1)

{

printf("msgsnd error\n");

return -1;

}

return 0;

}

//msgq_recv.c

#include stdio.h

#include string.h

#include sys/msg.h

#define MAXSIZE 256

int main(void)

{

int msgid;

int msgsize;

char buff[MAXSIZE];

msgid = msgget((key_t)2000, IPC_CREAT | 0644);

if (msgid == -1)

{

printf("msgget error\n");

return -1;

}

msgsize = msgrcv(msgid, (void *)buff, MAXSIZE, 0, 0);

if (msgsize == -1)

{

printf("msgrcv error\n");

return -1;

}

printf("%s\n", buff);

return 0;

}

//Makefile

TARGET := msgq_send msgq_recv

CC := gcc

CFLAGS := -Wall -g

all: msgq_send msgq_recv

msgq_send: msgq_send.o

$(CC) $(CFLAGS) $^ -o $@

msgq_recv: msgq_recv.o

$(CC) $(CFLAGS) $^ -o $@

clean:

rm -fr *.o $(TARGET)

.PHONY :clean

soulcoder——消息队列知识总结(偏向于 Kafka)

[toc]

分析一个消息队列主要从这几个点出来。

在后半部分主要分析了 kafka 对以上几点的保证。

详见下文分析重点分析。

事务支持方面,ONS/RocketMQ较为优秀,但是不支持消息批量操作, 不保证消息至少被消费一次.

Kafka提供完全分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积, 支持批量操作, 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次. 但是官方提供的运维工具不友好,开源社区的运维工具支持的版本一般落后于最新版本的Kafka.

目前使用的MNS服务,拥有HTTP REST API, 使用简单, 数据可靠性高, 但是不保证消息有序,不能回溯数据.

RabbitMQ为重量级消息系统, 支持多协议(很多协议是目前业务用不到的), 但是不支持回溯数据, master挂掉之后, 需要手动从slave恢复, 可用性略逊一筹.

以rcoketMQ为例,他的集群就有

第一眼看到这个图,就觉得和kafka好像,只是NameServer集群,在kafka中是用zookeeper代替,都是用来保存和发现master和slave用的。

通信过程如下:

Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。

Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。

那么kafka呢?

为了对比说明直接上kafka的拓补架构图

如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

最骚的一个操作,消费者业务自己去保证幂等性。

换一个说法,如何保证消息队列的幂等性?

另外说一点,幂等性的保证需要在一次请求中所有链路都是幂等的,再能最终保证这次请求的幂等,比如前段按钮点击两次,后端认为都是这是两次不同的请求,当然处理成两次请求,所以说一个请求的幂等性,需要全局的幂等才能保证。

其实无论是哪种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同。

例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下(后续详细解释),就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。

那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

如何解决?这个问题针对业务场景来答分以下几点

其实这个可靠性传输,每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据。

从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

transaction(事物机制)机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。然而缺点就是吞吐量下降了。

生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

简单来讲 confirm模式就是生产者发送请求,到了消息队列,消息队列会回复一个消息收到的应答,如果没收到,生产者开始重试。

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息(但是消息队列那边已经认为消息被消费了),就会丢失该消息。

至于解决方案,采用手动确认消息即可。

kafka为例

Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader中pull数据。

在kafka生产中,基本都有一个leader和多个follwer。follwer会去同步leader的信息。因此,为了避免生产者丢数据,做如下两点配置

针对消息队列丢数据的情况,无外乎就是,数据还没同步,leader就挂了,这时zookpeer会将其他的follwer切换为leader,那数据就丢失了。针对这种情况,应该做两个配置。

这种情况一般是自动提交了offset,然后你处理程序过程中挂了。kafka以为你处理好了。再强调一次offset是干嘛的。

offset:指的是kafka的topic中的每个消费组消费的下标。简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。

比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue)。然后只用一个消费者去消费该队列。

有的人会问:那如果为了吞吐量,有多个消费者去消费怎么办?

简单来说消息的时序性也可以通过错误重试来解决。

比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。如果是这样一个业务场景,那只要重试就行。比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。

总之,针对这个问题,我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。

为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。

单个parition内是保证消息有序。

订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。

同一个消费组中的两个消费者,只能消费一个partition。

换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。

如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。

kafka api 提供了很多功能比如

生产者能指定 topic 和 Partition 来投递消息,并且还有延迟消息,事务消息等等,详见下面的 api 文档

这个是 api 的中文文档

Kakfa Broker集群受Zookeeper管理。

这里先说下

关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。

所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,并且只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为 Kafka Broker Controller ,其他的Kafka broker叫 Kafka Broker follower 。(这个过程叫Controller在ZooKeeper注册Watch)。

这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点。

Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。

最少1次(at most once):可能会重传数据,有可能出现数据被重复处理的情况;

最多1次(at least once):可能会出现数据丢失情况;

恰好1次(Exactly once):并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。

操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。

每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。

我觉得的靠的是这两个参数

这篇主要从生产和消费的角度详细给出的过程

信号量是什么?有什么区别?使用信号量进行任务间通信有何优缺点

一:UCOS是一种抢占式的多任务操作系统,如果最高优先级的任务不主动放弃CPU的使用的话,其他任务是无法运行的,通常情况下,高优先级的任务在使用完CPU或其他资源后都要主动放弃,可以通过延时函数或者时等待一些信号量之类的让自己挂起。但是如果最高优先级任务一直使用CPU,那就跟单任务没有什么区别了。

二:可以通过等待信号量,消息等是当前任务挂起,或者通过通过延时函数将任务挂起,从而让其他优先级的任务运行。

UC/OS的信号量,消息队列,邮箱的区别

信号量像一把钥匙,任务要运行下去,需先拿到这把钥匙。

消息邮箱是一个指针型变量。可以向一个任务或一个中断服务子程序发送一则消息(一个指针),同样,一个或多个任务通过内核服务,可以接收这则消息。消息邮箱也可以当作只取2个值的信号量来用。

消息队列实际上是邮箱阵列。

Linux系统编程—消息队列

消息队列本质上是位于内核空间的链表,链表的每个节点都是一条消息。每一条消息都有自己的消息类型,消息类型用整数来表示,而且必须大于 0。每种类型的消息都被对应的链表所维护:

其中数字 1 表示类型为 1 的消息,数字2、3、4 类似。彩色块表示消息数据,它们被挂在对应类型的链表上。

值得注意的是,刚刚说过没有消息类型为 0 的消息,实际上,消息类型为 0 的链表记录了所有消息加入队列的顺序,其中红色箭头表示消息加入的顺序。

无论你是发送还是接收消息,消息的格式都必须按照规范来。简单的说,它一般长成下面这个样子:

所以,只要你保证首4字节(32 位 linux 下的 long)是一个整数就行了。

举个例子:

从上面可以看出,正文部分是什么数据类型都没关系,因为消息队列传递的是 2 进制数据,不一定非得是文本。

msgsnd 函数用于将数据发送到消息队列。如果该函数被信号打断,会设置 errno 为 EINTR。

参数 msqid:ipc 内核对象 id

参数 msgp:消息数据地址

参数 msgsz:消息正文部分的大小(不包含消息类型)

参数 msgflg:可选项

该值为 0:如果消息队列空间不够,msgsnd 会阻塞。

IPC_NOWAIT:直接返回,如果空间不够,会设置 errno 为 EAGIN.

返回值:0 表示成功,-1 失败并设置 errno。

msgrcv 函数从消息队列取出消息后,并将其从消息队列里删除。

参数 msqid:ipc 内核对象 id

参数 msgp:用来接收消息数据地址

参数 msgsz:消息正文部分的大小(不包含消息类型)

参数 msgtyp:指定获取哪种类型的消息

msgtyp = 0:获取消息队列中的第一条消息

msgtyp 0:获取类型为 msgtyp 的第一条消息,除非指定了 msgflg 为MSG_EXCEPT,这表示获取除了 msgtyp 类型以外的第一条消息。

msgtyp 0:获取类型 ≤|msgtyp|≤|msgtyp| 的第一条消息。

参数 msgflg:可选项。

如果为 0 表示没有消息就阻塞。

IPC_NOWAIT:如果指定类型的消息不存在就立即返回,同时设置 errno 为 ENOMSG

MSG_EXCEPT:仅用于 msgtyp 0 的情况。表示获取类型不为 msgtyp 的消息

MSG_NOERROR:如果消息数据正文内容大于 msgsz,就将消息数据截断为 msgsz

程序 msg_send 和 msg_recv 分别用于向消息队列发送数据和接收数据。

msg_send 程序定义了一个结构体 Msg,消息正文部分是结构体 Person。该程序向消息队列发送了 10 条消息。

msg_send.c

程序 msg_send 第一次运行完后,内核中的消息队列大概像下面这样:

msg_recv 程序接收一个参数,表示接收哪种类型的消息。比如./msg_recv 4 表示接收类型为 4 的消息,并打印在屏幕。

先运行 msg_send,再运行 msg_recv。

接收所有消息

接收类型为 4 的消息

获取和设置消息队列的属性

msqid:消息队列标识符

cmd:控制指令

IPC_STAT:获得msgid的消息队列头数据到buf中

IPC_SET:设置消息队列的属性,要设置的属性需先存储在buf中,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes

buf:消息队列管理结构体。

返回值:

成功:0

出错:-1,错误原因存于error中

EACCESS:参数cmd为IPC_STAT,确无权限读取该消息队列

EFAULT:参数buf指向无效的内存地址

EIDRM:标识符为msqid的消息队列已被删除

EINVAL:无效的参数cmd或msqid

EPERM:参数cmd为IPC_SET或IPC_RMID,却无足够的权限执行

相关文章

邮箱如何取消关联,新浪邮箱怎么取消关联

邮箱如何取消关联,新浪邮箱怎么取消关联

outlook与新浪邮箱关联后如何解除关联 解除outlook与新浪邮箱的关联:1、打开outlook,点工具菜单——帐号设置;2、选择新浪邮箱帐户,点删除。怎样取消新浪邮箱的手机绑定 建议你咨询新浪...

邮箱内容能撤回吗怎么弄,邮箱内容能撤回吗

邮箱内容能撤回吗怎么弄,邮箱内容能撤回吗

邮件应该怎么撤回呢 1、方法二:发送撤回请求 如果您使用的是其他电子邮件服务,如Gmail或Yahoo等,在常规邮件撤回之外,您仍然可以发送撤回请求给接收者。当您发送撤回请求时,接收者将收到一个电子邮...

56邮箱网址,56邮箱拍照

56邮箱网址,56邮箱拍照

为什么都要56邮箱 最近56邮箱()新开发了共享功能,首先你需注册一个56邮箱,登录后在页面菜单上会出现“共享资源”字样,点击进入,输入你要查看的用户名,如abcd(输入用户名时不需要加 @56.co...

上发的文件怎么保存,qq保存的文件怎么发邮箱

上发的文件怎么保存,qq保存的文件怎么发邮箱

如何把一个文件发送到QQ邮箱 首先登录qq。在qq主界面中找到qq邮箱的图标,并点击打开。点击写信。在收件人中填写“自己的邮箱号”。意思就是发给自己。添加文件,点击发送。点击收件箱。如图,文件已经发送...

可以关闭邮箱功能吗,qq可以关闭邮箱功能吗

可以关闭邮箱功能吗,qq可以关闭邮箱功能吗

怎么关闭qq邮箱 1、要关闭QQ邮箱,您需要登录QQ邮箱,进入设置页面,然后在账户选项卡下找到关闭邮箱的选项。2、关闭qq邮箱,需要在qq邮箱账户的安全管理中设置,本答案通过华为手机进行演示,下面是具...

昆明世博园咨询号码,昆明世博园的邮箱

昆明世博园咨询号码,昆明世博园的邮箱

昆明世博园开放时间及门票 1、昆明世博园旅游攻略(营业时间+门票+地点+交通)营业时间:全年08:30-17:00*营业时间仅供参考,如果变动,请以官方当日实际披露为准。2、昆明世博园将于2023年4...

评论列表

访客
2023-01-25 23:34:29

久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么

访客
2023-01-26 04:13:32

为什么我在linux下给自己的root用户发送邮件的时候连接到别人的邮箱了?对方的postfix服务器没有正常启动,或许网卡或者别的原因,比如你加过密,密码验证错误Linux程序设计:关于消息队列 题目:编写两程序,程序1从消息队列接收消息,程序2则发送

访客
2023-01-26 02:11:29

? 另外说一点,幂等性的保证需要在一次请求中所有链路都是幂等的,再能最终保证这次请求的幂等,比如前段按钮点击两次,后端认为都是这是两次不同的请求,当然处理成两次请求,所以

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。