hadoop zookeeperr连接过期问题。

8 ZooKeeper的操作与编程_图文_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
8 ZooKeeper的操作与编程
阅读已结束,如果下载本文需要使用
想免费下载本文?
下载文档到电脑,查找使用更方便
还剩27页未读,继续阅读
你可能喜欢[ZooKeeper]Client Session失效 - 吊丝码农 - ITeye技术网站
前面一篇文章提到zookeeper server端主动发现session超时并清理session信息,关闭连接,接下来看看client端如何试图恢复session的。关于client端代码分析见前文/blog/1754611 。由于session被清理,此时server端已经没有session信息了。而由于连接被关闭,client会抛出异常
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc & 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
SendThread处理异常
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
(e.getMessage() + RETRY_CONN_MSG);
//连接被关闭了
else if (e instanceof EndOfStreamException) {
(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
(e.getMessage());
"Session 0x"
+ Long.toHexString(getSessionId())
+ " for server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
//清理连接,失败当前请求
cleanup();
//此时state还是CONNECTED,发送DISCONNECTED状态通知
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
接下来client重新寻找下一个server进行session恢复,此时client的sessionId和password仍然是上一次创建的session信息。
//此处返回true,因为连接已经被置为null了
if (!clientCnxnSocket.isConnected()) {
//重连时,先sleep一下
if(!isFirstConnect){
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
//重新开始连接
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
取下一个server
addr = hostProvider.next(1000);
之后就和新建session时类似,区别是发送ConnectRequest时sessionid和password都是老的
//新建session成功时,seenRwServerBefore已经被置为true
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
接下来server端处理
//此时sessionId不为0
long sessionId = connReq.getSessionId();
if (sessionId != 0) {
long clientSessionId = connReq.getSessionId();
("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
//先关闭老的连接,如果有的话,删除watch
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
重启session
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
//检查密码,如果不一样,则结束session,返回client一个为0的sessionid。如果sessionid为0,则为false
if (!checkPasswd(sessionId, passwd)) {
finishSessionInit(cnxn, false);
//密码正确,再校验下session是否还有效,这里不同的server处理不一样
revalidateSession(cnxn, sessionId, sessionTimeout);
重试的server如果是leader
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
//父类中通过sessionTrack检查
super.revalidateSession(cnxn, sessionId, sessionTimeout);
// setowner as the leader itself, unless updated
// via the follower handlers
setOwner(sessionId, ServerCnxn.me);
} catch (SessionExpiredException e) {
// this is ok, it just means that the session revalidation failed.
父类revalidateSession方法
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) +
" is valid: " + rc);
finishSessionInit(cnxn, rc);
leader的SessionTracker为SessionTrackerImpl,touchSession方法如下
synchronized public boolean touchSession(long sessionId, int timeout) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.CLIENT_PING_TRACE_MASK,
"SessionTrackerImpl --- Touch session: 0x"
+ Long.toHexString(sessionId) + " with timeout " + timeout);
//因为session超时,session已经被删掉了,此处返回null,所以检查结果是false
SessionImpl s = sessionsById.get(sessionId);
// Return false, if the session doesn't exists or marked as closing
if (s == null || s.isClosing()) {
对于leader来说session检查的结果是false。
如果是follower,其校验方法
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
//需要询问leader,session是否还有效
getLearner().validateSession(cnxn, sessionId, sessionTimeout);
follower询问session是否有效
void validateSession(ServerCnxn cnxn, long clientId, int timeout)
throws IOException {
("Revalidating client: 0x" + Long.toHexString(clientId));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeLong(clientId);
dos.writeInt(timeout);
dos.close();
//REVALIDATE包用来检查session是否还有效
QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
.toByteArray(), null);
pendingRevalidations.put(clientId, cnxn);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"To validate session 0x"
+ Long.toHexString(clientId));
writePacket(qp, true);
leader端处理REVALIDATE包
case Leader.REVALIDATE:
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
//这里由于session已经被删掉,返回false
boolean valid = leader.zk.touch(id, to);
if (valid) {
//set the session owner
// as the follower that
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: "+ valid);
//结果是false
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
queuedPackets.add(qp);
follower处理返回结果
case Leader.REVALIDATE:
revalidate(qp);
protected void revalidate(QuorumPacket qp) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
long sessionId = dis.readLong();
boolean valid = dis.readBoolean();
ServerCnxn cnxn = pendingRevalidations
.remove(sessionId);
if (cnxn == null) {
LOG.warn("Missing session 0x"
+ Long.toHexString(sessionId)
+ " for validation");
zk.finishSessionInit(cnxn, valid);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId)
+ " is valid: " + valid);
可以看到无论是leader还是follower最后都会调用zk.finishSessionInit(cnxn, valid)处理,而由于session已经失效,所以valid为false
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
//由于valid是false,所以返回给client的sessionid为0,password为空
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(
this instanceof ReadOnlyZooKeeperServer, "readOnly");
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
cnxn.sendBuffer(bb);
client端处理ConnectResponse
void readConnectResult() throws IOException {
//被server重置为0了
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionT
//sessionid为0,抛出SessionExpiredException异常
if (negotiatedSessionTimeout &= 0) {
//state设为CLOSED,这个行为将关闭SendThread
state = States.CLOSED;
//Expired状态通知
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
//这个行为将关闭EventThread
eventThread.queueEventOfDeath();
//抛出异常
throw new SessionExpiredException(
"Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired");
SendThread处理SessionExpiredException,关闭SendThread
while (state.isAlive()) {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
(e.getMessage() + ", closing socket connection");
//关闭连接,失败所有请求
cleanup();
//此时state已经被置为CLOSED,SendThread将退出
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
cleanup();
//关闭selector
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exitedloop.");
private void cleanup() {
//关闭socket
clientCnxnSocket.cleanup();
//发送完等待响应的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
conLossPacket(p);
pendingQueue.clear();
//等待发送的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED
synchronized (outgoingQueue) {
for (Packet p : outgoingQueue) {
conLossPacket(p);
outgoingQueue.clear();
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
//关闭的时候,是SESSIONEXPIRED异常码
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
//其他是CONNECTIONLOSS异常码
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
finishPacket(p);
EventThread关闭
public void run() {
isRunning =
while (true) {
Object event = waitingEvents.take();
//kill signal
if (event == eventOfDeath) {
wasKilled =
processEvent(event);
if (wasKilled)
//等所有通知都发完再退出
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning =
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
("EventThread shut down");
之后client将不可用,所有请求都将发送的时候都将收到SESSIONEXPIRED异常码,因为queuePacket的时候一个判断
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
Packet packet =
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb =
packet.ctx =
packet.clientPath = clientP
packet.serverPath = serverP
//此时状态为CLOSED
if (!state.isAlive() || closing) {
conLossPacket(packet);
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
outgoingQueue.add(packet);
sendThread.getClientCnxnSocket().wakeupCnxn();
从以上分析可知,SESSIONEXPIRED异常码是比较严重的事件,之后这个zookeeper实例不可用了,如果需要恢复,则需要重新创建zookeeper实例。而CONNECTIONLOSS异常码是比较常见的,比如网络暂时中断的时候,这个状态码下zookeeper会自动重连恢复,因为server端还保留着session信息。
这篇文章说的更全面,不错不错,学习学习
浏览: 126482 次
来自: 杭州
我想知道淘宝HSF使用OSGi集成进去 目的是什么???有哪些 ...
想问下你hbase版本是多少?我的是94.2,看到有些少少差异 ...
请问如何设置maxVersion呢?现在默认确实只保留3个版本 ...
讲了代码,但是没讲原理,我觉得其实原理更重要一些
看了一些,总结的还可以~用心创造滤镜
扫码下载App
汇聚2000万达人的兴趣社区下载即送20张免费照片冲印
扫码下载App
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!&&|&&
关注海量数据存储、处理和检索,MySQL,系统运维,图像处理等技术
LOFTER精选
网易考拉推荐
kafka系列教程5(客户端实践)&&
14:52:56|&&分类:
用微信&&“扫一扫”
将文章分享到朋友圈。
用易信&&“扫一扫”
将文章分享到朋友圈。
可以使用服务器端下载的kafka二进制包及依赖,也可以通过mavne获取(注意实测发现该方式拿到的包是用jdk7打的):
&dependency&
&groupId&com.sksamuel.kafka&/groupId&
&artifactId&kafka_2.10&/artifactId&
&version&0.8.0-beta1&/version&
&/dependency&生产者下面是开发生产者代码的例子:
Properties props = new Properties();
//指定kafka节点:注意这里无需指定集群中所有Boker,只要指定其中部分即可,它会自动取meta信息并连接到对应的Boker节点
props.put("metadata.broker.list", "172.17.1.163:9093");
//指定采用哪种序列化方式将消息传输给Boker,你也可以在发送消息的时候指定序列化类型,不指定则以此为默认序列化类型
props.put("serializer.class", "kafka.serializer.StringEncoder");
//指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也可以在发送消息的时候指定分区类型。
props.put("partitioner.class", "example.producer.SimplePartitioner");
//该属性表示你需要在消息被接收到的时候发送ack给发送者。以保证数据不丢失
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
//申明生产者:泛型1为分区key类型,泛型2为消息类型
Producer&String, String& producer = new Producer&String, String&(config);
//创建KeyedMessage发送消息,参数1为topic名,参数2为分区名(若为null则随机发到一个分区),参数3为消息
producer.send(new KeyedMessage&String,String&("topic","partitionKey1","msg1"));
producer.close();//自定义分区:public class SimplePartitioner implements Partitioner&String& {& & public SimplePartitioner (VerifiableProperties props) {& & }&& & public int partition(String key, int a_numPartitions) {& & & &return key.length()%a_numP& }&}消费者消费者api分上层api和底层api,这里是采用上层api的消费者例子(无需关系消息的offset,只是希望获得数据)注意:1.上层api将会内部实现持久化每个分区最后读到的消息的offset,数据保存在zookeeper中的消费组名中(如/consumers/id1/offsets/test2/2。其中id1是消费组,test2是topic,最后一个2表示第3个分区),每间隔一个很短的时间更新一次offset,那么可能在重启消费者时拿到重复的消息。此外,当分区leader发生变更时也可能拿到重复的消息。因此在关闭消费者时最好等待一定时间(10s)然后再shutdown()2.消费组名是一个全局的信息,要注意在新的消费者启动之前旧的消费者要关闭。如果新的进程启动并且消费组名相同,kafka会添加这个进程到可用消费线程组中用来消费topic和触发重新分配负载均衡,那么同一个分区的消息就有可能发送到不同的进程中。3.如果消费的线程多于分区数,一些线程可能永远无法看到一些消息。4.如果分区数多于线程数,一些线程会收到多个分区的消息5.如果一个线程对应了多个分区,那么接收到的消息是不能保证顺序的。备注:可用zk的命令查询:get /consumers/id1/owners/test3/2其中id1为消费组,test3为topic,2为分区3.查看里面的内容如:id1_163-PC-4-1091aef2-1表示该分区被该标示的线程所执行。下面举例:
Properties props = new Properties();
// 指定zookeeper服务器地址
props.put("zookeeper.connect", "172.17.1.163:2181");
// 指定消费组(没有它会自动添加)
props.put("group.id", "id1");
// 指定kafka等待多久zookeeper回复(ms)以便放弃并继续消费。
props.put("zookeeper.session.timeout.ms", "4000");
// 指定zookeeper同步最长延迟多久再产生异常
props.put("zookeeper.sync.time.ms", "2000");
// 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
props.put("mit.interval.ms", "1000");
ConsumerConnector consumer = Consumer
.createJavaConsumerConnector(new ConsumerConfig(props));
// 我们要告诉kafka该进程会有多少个线程来处理对应的topic
Map&String, Integer& topicCountMap = new HashMap&String, Integer&();
int a_numThreads = 3;
// 用3个线程来处理topic:test2
topicCountMap.put("test2", a_numThreads);
// 拿到每个stream对应的topic
Map&String, List&KafkaStream&byte[], byte[]&&& consumerMap = consumer
.createMessageStreams(topicCountMap);
List&KafkaStream&byte[], byte[]&& streams = consumerMap.get("test2");
// 调用thread pool来处理topic
ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator&byte[], byte[]& it = stream.iterator();
while (it.hasNext()) {
System.out.println(Thread.currentThread() + ":"
+ new String(it.next().message()));
System.in.read();
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();下面是采用底层api实现的消费者(需要多次读消息,或从部分分区里读数据,或用事务保证消息只处理一次)注意:1.你必须自己实现当停止消费时如何持久化offset2.你必须自己找到哪个broker是leader以便处理topic和分区3.你必须自己处理leader变更使用阶段:1.找到那些broker是leader以便读取topic和partition2.自己决定哪个副本作为你的topic和分区3.建立自己需要请求并自定义获取你感兴趣的数据4.获取数据5.当leader变更时自己识别和恢复。例子:
String topic = "test2";
int partition = 1;
String brokers = "172.17.1.163:9093";
int maxReads = 100; // 读多少条数据
// 1.找leader
PartitionMetadata metadata =
for (String ipPort : brokers.split(",")) {
//我们无需要把所有的brokers列表加进去,目的只是为了获得metedata信息,故只要有broker可连接即可
SimpleConsumer consumer =
String[] ipPortArray = ipPort.split(":");
consumer = new SimpleConsumer(ipPortArray[0],
Integer.parseInt(ipPortArray[1]),
"leaderLookup");
List&String& topics = new ArrayList&String&();
topics.add(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
// 取meta信息
TopicMetadataResponse resp = consumer.send(req);
//获取topic的所有metedate信息(目测只有一个metedata信息,何来多个?)
List&TopicMetadata& metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
//获取每个meta信息的分区信息,这里我们只取我们关心的partition的metedata
System.out.println("----"+part.partitionId());
if (part.partitionId() == partition) {
metadata =
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + ipPort
+ "] to find Leader for [" + topic + ", " + partition
+ "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
if (metadata == null || metadata.leader() == null) {
System.out.println("meta data or leader not found, exit.");
// 拿到leader
Broker leadBroker = metadata.leader();
// 获取所有副本
System.out.println(metadata.replicas());
// 2.获取lastOffset(这里提供了两种方式:从头取或从最后拿到的开始取,下面这个是从头取)
long whichTime = kafka.api.OffsetRequest.EarliestTime();
//这个是从最后拿到的开始取//
long whichTime = kafka.api.OffsetRequest.LatestTime();
System.out.println("lastTime:"+whichTime);
String clientName = "Client_" + topic + "_" +
SimpleConsumer consumer = new SimpleConsumer(leadBroker.host(),
leadBroker.port(),
* 1024, clientName);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map&TopicAndPartition, PartitionOffsetRequestInfo& requestInfo = new HashMap&TopicAndPartition, PartitionOffsetRequestInfo&();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
// 获取指定时间前有效的offset列表
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
// 千万不要认为offset一定是从0开始的
long[] offsets = response.offsets(topic, partition);
System.out.println("offset list:" + Arrays.toString(offsets));
long offset = offsets[0];
while (maxReads & 0) {
// 注意不要调用里面的replicaId()方法,这是内部使用的。
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// 出错处理。这里只直接返回了。实际上可以根据出错的类型进行判断,如code == ErrorMapping.OffsetOutOfRangeCode()表示拿到的offset错误
// 一般出错处理可以再次拿offset,或重新找leader,重新建立consumer。可以将上面的操作都封装成方法。再在该循环来进行消费
// 当然,在取所有leader的同时可以用metadata.replicas()更新最新的节点信息。另外zookeeper可能不会立即检测到有节点挂掉,故如果发现老的leader和新的leader一样,可能是leader根本没挂,也可能是zookeeper还没检测到,总之需要等等。
short code = fetchResponse.errorCode(topic, partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
//取一批消息
boolean empty =
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
topic, partition)) {
long curOffset = messageAndOffset.offset();
//下面这个检测有必要,因为当消息是压缩的时候,通过fetch获取到的是一个整块数据。块中解压后不一定第一个消息就是offset所指定的。就是说存在再次取到已读过的消息。
if (curOffset & offset) {
System.out.println("Found an old offset: " + curOffset
+ " Expecting: " + offset);
// 可以通过当前消息知道下一条消息的offset是多少
offset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())
+ ": " + new String(bytes, "UTF-8"));
maxReads++;
//进入循环中,等待一会后获取下一批数据
if(empty){
Thread.sleep(1000);
// 退出(这里象征性的写一下)
if (consumer != null)
consumer.close();另外还有采用hadoop专用api实现消息保存到hadoop中(这里略)消费者配置消费者或consumer.properties配置:zookeeper.connect:zookeeper连接服务器地址zookeeper.session.timeout.ms对zookeeper的session过期时间,默认6000ms,用于检测消费者是否挂掉,当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡group.id:指定消费组<mit.enable:是否自动提交:这里提交意味着客户端会自动定时更新offset到zookeeper.默认为true<mit.interval.ms:自动更新时间。默认60 * 1000auto.offset.reset:如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largestconsumer.timeout.ms:如果一段时间没有收到消息,则抛异常。默认-1fetch.message.max.bytes:每次取的块的大小(默认),多个消息通过块来批量发送给消费者,指定块大小可以指定有多少消息可以一次取出。注意若一个消息就超过了该块指定的大小,它将拿不到queued.max.message.chunks:最大取多少块缓存到消费者(默认10)。更多配置可参见ConsumerConfig类生产者配置生产者或producer.properties配置:metadata.broker.list:指定kafka节点列表,用于获取metadata,不必全部指定request.required.acks:指定生产者发送请求如何确认完成:0(默认)表示生产者不用等待broker返回ack。1表示当有复本(该复本节点不一定是同步)收到了消息后发回ack给生产者(如果leader挂掉且刚好收到消息的复本也挂掉则消息丢失)。-1表示所有已同步的复本收到了消息后发回ack给生产者(可以保证只要有一个已同步的复本存活就不会有数据丢失)。producer.type:同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息queue.buffering.max.ms:如果是异步,指定每次发送最大间隔时间queue.buffering.max.messages:如果是异步,指定每次发送缓存最大数据量serializer.class:指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[]key.serializer.class:单独序列化key处理类,默认和serializer.class一致partitioner.class:指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区message.send.max.retries:消息发送重试次数,默认3次retry.backoff.ms:消息发送重试间隔次数compression.codec:是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。compressed.topics:如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示全压缩。更多配置可参见ProducerConfig类
阅读(5975)|
用微信&&“扫一扫”
将文章分享到朋友圈。
用易信&&“扫一扫”
将文章分享到朋友圈。
历史上的今天
loftPermalink:'',
id:'fks_',
blogTitle:'kafka系列教程5(客户端实践)',
blogAbstract:'添加依赖可以使用服务器端下载的kafka二进制包及依赖,也可以通过mavne获取(注意实测发现该方式拿到的包是用jdk7打的):\t\t&dependency&\t\t\t&groupId&com.sksamuel.kafka&/groupId&\t\t\t&artifactId&kafka_2.10&/artifactId&\t\t\t&version&0.8.0-beta1&/version&\t\t&/dependency&',
blogTag:'kafka',
blogUrl:'blog/static/',
isPublished:1,
istop:false,
modifyTime:4,
publishTime:5,
permalink:'blog/static/',
commentCount:2,
mainCommentCount:2,
recommendCount:0,
bsrk:-100,
publisherId:,
recomBlogHome:false,
currentRecomBlog:false,
attachmentsFileIds:[],
groupInfo:{},
friendstatus:'none',
followstatus:'unFollow',
pubSucc:'',
visitorProvince:'',
visitorCity:'',
visitorNewUser:false,
postAddInfo:{},
mset:'000',
remindgoodnightblog:false,
isBlackVisitor:false,
isShowYodaoAd:false,
hostIntro:'关注海量数据存储、处理和检索,MySQL,系统运维,图像处理等技术',
selfRecomBlogCount:'0',
lofter_single:''
{list a as x}
{if x.moveFrom=='wap'}
{elseif x.moveFrom=='iphone'}
{elseif x.moveFrom=='android'}
{elseif x.moveFrom=='mobile'}
${a.selfIntro|escape}{if great260}${suplement}{/if}
{list a as x}
推荐过这篇日志的人:
{list a as x}
{if !!b&&b.length>0}
他们还推荐了:
{list b as y}
转载记录:
{list d as x}
{list a as x}
{list a as x}
{list a as x}
{list a as x}
{if x_index>4}{break}{/if}
${fn2(x.publishTime,'yyyy-MM-dd HH:mm:ss')}
{list a as x}
{if !!(blogDetail.preBlogPermalink)}
{if !!(blogDetail.nextBlogPermalink)}
{list a as x}
{if defined('newslist')&&newslist.length>0}
{list newslist as x}
{if x_index>7}{break}{/if}
{list a as x}
{var first_option =}
{list x.voteDetailList as voteToOption}
{if voteToOption==1}
{if first_option==false},{/if}&&“${b[voteToOption_index]}”&&
{if (x.role!="-1") },“我是${c[x.role]}”&&{/if}
&&&&&&&&${fn1(x.voteTime)}
{if x.userName==''}{/if}
网易公司版权所有&&
{list x.l as y}
{if defined('wl')}
{list wl as x}{/list}

我要回帖

更多关于 hadoop zookeeper 的文章

 

随机推荐