对方的postfix服务器没有正常启动,或许网卡或者别的原因,比如你加过密,密码验证错误
我的作业,你凑合着用吧
//msgq_send.c
#include stdio.h
#include string.h
#include sys/msg.h
#define MAXSIZE 256
int main(int argc, char *argv[])
{
if (argc 2)
{
printf("Error args\n");
return -1;
}
int msgid;
msgid = msgget((key_t)2000, IPC_CREAT | 0644);
if (msgid == -1)
{
printf("msgget error\n");
return -1;
}
if (msgsnd(msgid, (void *)argv[1], MAXSIZE, 0) == -1)
{
printf("msgsnd error\n");
return -1;
}
return 0;
}
//msgq_recv.c
#include stdio.h
#include string.h
#include sys/msg.h
#define MAXSIZE 256
int main(void)
{
int msgid;
int msgsize;
char buff[MAXSIZE];
msgid = msgget((key_t)2000, IPC_CREAT | 0644);
if (msgid == -1)
{
printf("msgget error\n");
return -1;
}
msgsize = msgrcv(msgid, (void *)buff, MAXSIZE, 0, 0);
if (msgsize == -1)
{
printf("msgrcv error\n");
return -1;
}
printf("%s\n", buff);
return 0;
}
//Makefile
TARGET := msgq_send msgq_recv
CC := gcc
CFLAGS := -Wall -g
all: msgq_send msgq_recv
msgq_send: msgq_send.o
$(CC) $(CFLAGS) $^ -o $@
msgq_recv: msgq_recv.o
$(CC) $(CFLAGS) $^ -o $@
clean:
rm -fr *.o $(TARGET)
.PHONY :clean
[toc]
分析一个消息队列主要从这几个点出来。
在后半部分主要分析了 kafka 对以上几点的保证。
详见下文分析重点分析。
事务支持方面,ONS/RocketMQ较为优秀,但是不支持消息批量操作, 不保证消息至少被消费一次.
Kafka提供完全分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积, 支持批量操作, 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次. 但是官方提供的运维工具不友好,开源社区的运维工具支持的版本一般落后于最新版本的Kafka.
目前使用的MNS服务,拥有HTTP REST API, 使用简单, 数据可靠性高, 但是不保证消息有序,不能回溯数据.
RabbitMQ为重量级消息系统, 支持多协议(很多协议是目前业务用不到的), 但是不支持回溯数据, master挂掉之后, 需要手动从slave恢复, 可用性略逊一筹.
以rcoketMQ为例,他的集群就有
第一眼看到这个图,就觉得和kafka好像,只是NameServer集群,在kafka中是用zookeeper代替,都是用来保存和发现master和slave用的。
通信过程如下:
Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。
Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
那么kafka呢?
为了对比说明直接上kafka的拓补架构图
如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
最骚的一个操作,消费者业务自己去保证幂等性。
换一个说法,如何保证消息队列的幂等性?
另外说一点,幂等性的保证需要在一次请求中所有链路都是幂等的,再能最终保证这次请求的幂等,比如前段按钮点击两次,后端认为都是这是两次不同的请求,当然处理成两次请求,所以说一个请求的幂等性,需要全局的幂等才能保证。
其实无论是哪种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同。
例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下(后续详细解释),就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。
那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?这个问题针对业务场景来答分以下几点
其实这个可靠性传输,每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据。
从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction(事物机制)机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。然而缺点就是吞吐量下降了。
生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
简单来讲 confirm模式就是生产者发送请求,到了消息队列,消息队列会回复一个消息收到的应答,如果没收到,生产者开始重试。
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息(但是消息队列那边已经认为消息被消费了),就会丢失该消息。
至于解决方案,采用手动确认消息即可。
kafka为例
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader中pull数据。
在kafka生产中,基本都有一个leader和多个follwer。follwer会去同步leader的信息。因此,为了避免生产者丢数据,做如下两点配置
针对消息队列丢数据的情况,无外乎就是,数据还没同步,leader就挂了,这时zookpeer会将其他的follwer切换为leader,那数据就丢失了。针对这种情况,应该做两个配置。
这种情况一般是自动提交了offset,然后你处理程序过程中挂了。kafka以为你处理好了。再强调一次offset是干嘛的。
offset:指的是kafka的topic中的每个消费组消费的下标。简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。
针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue)。然后只用一个消费者去消费该队列。
有的人会问:那如果为了吞吐量,有多个消费者去消费怎么办?
简单来说消息的时序性也可以通过错误重试来解决。
比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。如果是这样一个业务场景,那只要重试就行。比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。
总之,针对这个问题,我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。
为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。
单个parition内是保证消息有序。
订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。
同一个消费组中的两个消费者,只能消费一个partition。
换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。
如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
kafka api 提供了很多功能比如
生产者能指定 topic 和 Partition 来投递消息,并且还有延迟消息,事务消息等等,详见下面的 api 文档
这个是 api 的中文文档
Kakfa Broker集群受Zookeeper管理。
这里先说下
关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。
所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,并且只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为 Kafka Broker Controller ,其他的Kafka broker叫 Kafka Broker follower 。(这个过程叫Controller在ZooKeeper注册Watch)。
这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点。
Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。
最少1次(at most once):可能会重传数据,有可能出现数据被重复处理的情况;
最多1次(at least once):可能会出现数据丢失情况;
恰好1次(Exactly once):并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。
操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。
每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。
我觉得的靠的是这两个参数
这篇主要从生产和消费的角度详细给出的过程
一:UCOS是一种抢占式的多任务操作系统,如果最高优先级的任务不主动放弃CPU的使用的话,其他任务是无法运行的,通常情况下,高优先级的任务在使用完CPU或其他资源后都要主动放弃,可以通过延时函数或者时等待一些信号量之类的让自己挂起。但是如果最高优先级任务一直使用CPU,那就跟单任务没有什么区别了。
二:可以通过等待信号量,消息等是当前任务挂起,或者通过通过延时函数将任务挂起,从而让其他优先级的任务运行。
UC/OS的信号量,消息队列,邮箱的区别
信号量像一把钥匙,任务要运行下去,需先拿到这把钥匙。
消息邮箱是一个指针型变量。可以向一个任务或一个中断服务子程序发送一则消息(一个指针),同样,一个或多个任务通过内核服务,可以接收这则消息。消息邮箱也可以当作只取2个值的信号量来用。
消息队列实际上是邮箱阵列。
消息队列本质上是位于内核空间的链表,链表的每个节点都是一条消息。每一条消息都有自己的消息类型,消息类型用整数来表示,而且必须大于 0。每种类型的消息都被对应的链表所维护:
其中数字 1 表示类型为 1 的消息,数字2、3、4 类似。彩色块表示消息数据,它们被挂在对应类型的链表上。
值得注意的是,刚刚说过没有消息类型为 0 的消息,实际上,消息类型为 0 的链表记录了所有消息加入队列的顺序,其中红色箭头表示消息加入的顺序。
无论你是发送还是接收消息,消息的格式都必须按照规范来。简单的说,它一般长成下面这个样子:
所以,只要你保证首4字节(32 位 linux 下的 long)是一个整数就行了。
举个例子:
从上面可以看出,正文部分是什么数据类型都没关系,因为消息队列传递的是 2 进制数据,不一定非得是文本。
msgsnd 函数用于将数据发送到消息队列。如果该函数被信号打断,会设置 errno 为 EINTR。
参数 msqid:ipc 内核对象 id
参数 msgp:消息数据地址
参数 msgsz:消息正文部分的大小(不包含消息类型)
参数 msgflg:可选项
该值为 0:如果消息队列空间不够,msgsnd 会阻塞。
IPC_NOWAIT:直接返回,如果空间不够,会设置 errno 为 EAGIN.
返回值:0 表示成功,-1 失败并设置 errno。
msgrcv 函数从消息队列取出消息后,并将其从消息队列里删除。
参数 msqid:ipc 内核对象 id
参数 msgp:用来接收消息数据地址
参数 msgsz:消息正文部分的大小(不包含消息类型)
参数 msgtyp:指定获取哪种类型的消息
msgtyp = 0:获取消息队列中的第一条消息
msgtyp 0:获取类型为 msgtyp 的第一条消息,除非指定了 msgflg 为MSG_EXCEPT,这表示获取除了 msgtyp 类型以外的第一条消息。
msgtyp 0:获取类型 ≤|msgtyp|≤|msgtyp| 的第一条消息。
参数 msgflg:可选项。
如果为 0 表示没有消息就阻塞。
IPC_NOWAIT:如果指定类型的消息不存在就立即返回,同时设置 errno 为 ENOMSG
MSG_EXCEPT:仅用于 msgtyp 0 的情况。表示获取类型不为 msgtyp 的消息
MSG_NOERROR:如果消息数据正文内容大于 msgsz,就将消息数据截断为 msgsz
程序 msg_send 和 msg_recv 分别用于向消息队列发送数据和接收数据。
msg_send 程序定义了一个结构体 Msg,消息正文部分是结构体 Person。该程序向消息队列发送了 10 条消息。
msg_send.c
程序 msg_send 第一次运行完后,内核中的消息队列大概像下面这样:
msg_recv 程序接收一个参数,表示接收哪种类型的消息。比如./msg_recv 4 表示接收类型为 4 的消息,并打印在屏幕。
先运行 msg_send,再运行 msg_recv。
接收所有消息
接收类型为 4 的消息
获取和设置消息队列的属性
msqid:消息队列标识符
cmd:控制指令
IPC_STAT:获得msgid的消息队列头数据到buf中
IPC_SET:设置消息队列的属性,要设置的属性需先存储在buf中,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes
buf:消息队列管理结构体。
返回值:
成功:0
出错:-1,错误原因存于error中
EACCESS:参数cmd为IPC_STAT,确无权限读取该消息队列
EFAULT:参数buf指向无效的内存地址
EIDRM:标识符为msqid的消息队列已被删除
EINVAL:无效的参数cmd或msqid
EPERM:参数cmd为IPC_SET或IPC_RMID,却无足够的权限执行
救命啊~~已经发出去的电子邮件能撤回么~~ 已经发出的邮件是可以撤回的。首先确保你的Outlook邮件已更新至最新版本,以下图为例:一、打开邮件二、进入邮件界面,在“开始”模块中找到“已发送邮件”按钮...
向凤凰网投稿有哪些途径? 凤凰网投稿途径:投稿信箱huaxifukan@qq.com凤凰网(凤凰新媒体,纽交所代码:FENG)是全球领先的跨平台网络新媒体公司,整合旗下综合门户凤凰网、手机凤凰网和凤凰...
庐江城市公共交通有限公司电话是多少? 庐江县水务集团有限责任公司联系方式:公司电话0551-87322750,公司邮箱ahljsc@16com,该公司在爱企查共有3条联系方式,其中有电话号码1条。合肥...
百度搜索显示该网站可能因黑客侵入而存在安全风险怎么办 有条件建议找专业做网站安全的sine安全来做安全维护。一:挂马预防措施:建议用户通过ftp来上传、维护网页,尽量不安装asp的上传程序。找到“上网...
广西玉林市兴业县位于哪个省哪个市 兴业县属于广西壮族自治区玉林市。兴业县是广西壮族自治区玉林市下辖的一个县级行政区,位于广西的中北部,南濒北部湾,北临福建省,东邻容县和陆川县,西靠博白县,地处广西壮族...
qq邮箱在哪里打开 在qq的界面中,通过点击顶部的搜索框,进入到搜索界面。 在搜索的界面中,点击搜索框,接着在搜索框内输入qq邮箱提醒。 成功输入后,点击QQ邮箱提醒的订阅号,进入新界面。 进入新界面...