Kafka(0.9.0.1) Offset重置工具

2019-10-11 05:44 来源:未知

当KafkaConsumerThread 开始start的时候,也就是KafkaConsumerThread run方法

规律表达:
  • 事实上正是接纳librdkafka 提供的api来subscribe这一个topic, 然后强制提交重新设置的offset;
  • 线三月节旅客运输维的consumer无需甘休;
  • 由于kafka rebalance的特点, 那么些工具亦不是百分之百的历次都使得, 但在本身的测量试验中成功率依旧相当高, 相比较手动重新初始化再重启consumer要省时留心得多;

  但是依旧有为数不菲客户期望掌握__consumer_offsets topic内部到底保存了哪些音讯,非常是想询问有些consumer group的运动是怎么样在该topic中保留的。针对这几个题目,本文将构成一个实例商量如何行使kafka-simple-consumer-shell脚本来查询该内部topic。

open方法首若是将user钦命的topic和相应的partition、offset,存款和储蓄到Map<卡夫卡TopicPartition, Long> subscribedPartitionsToStartOffsets中,接下去看flink 花费kafka的输入方法

获得这几个工具
  • github地址: KafkaOffsetTools
  • 利用前需求编写翻译
  • 运用办法:
Usage:
  --broker_list arg     kafka broker list
  --topic arg           kafka topic name
  --group arg           consumer group name
  --partition_list arg  reset partiton list
                        "":all parition(default value)
                         Or 1,2,3...
  --reset_pos arg (=0)  reset paritions to position:
                        0:earliest
                        1:latest

8. 到手钦定consumer group的位移音讯 

@Override public void open(Configuration configuration) throws Exception { // determine the offset commit mode,区分ON_CHECKPOINTS、DISABLED or KAFKA_PERIODIC,本文主要针对ON_CHECKPOINTS this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext.isCheckpointingEnabled; // create the kafka partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks; this.partitionDiscoverer.open(); subscribedPartitionsToStartOffsets = new HashMap<>(); //获取fixed topic's or topic pattern 's all partitions final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions(); //从checkpoint中恢复 if (restoredState != null) { for (KafkaTopicPartition partition : allPartitions) { //新的分区(未曾在checkpoint中的分区将从earliest offset 开始消费),old partition已经从checkpoint中恢复了,并且已经保存在subscribedPartitionsToStartOffsets if (!restoredState.containsKey(partition)) { restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); } } for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet { if (!restoredFromOldState) { // seed the partition discoverer with the union state while filtering out // restored partitions that should not be subscribed by this subtask if (KafkaTopicPartitionAssigner.assign( restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks == getRuntimeContext().getIndexOfThisSubtask{ subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue; } } else { // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state; // in this case, just use the restored state as the subscribed partitions subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue; } } if (filterRestoredPartitionsWithCurrentTopicsDescriptor) { subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> { if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic { LOG.warn( "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.", entry.getKey; return true; } return false; }); } LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { // use the partition discoverer to fetch the initial seed partitions, // and set their initial offsets depending on the startup mode. // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined // when the partition is actually read. switch (startupMode) { case SPECIFIC_OFFSETS: if (specificStartupOffsets == null) { throw new IllegalStateException( "Startup mode for the consumer set to "   StartupMode.SPECIFIC_OFFSETS   ", but no specific offsets were specified."); } for (KafkaTopicPartition seedPartition : allPartitions) { //指定partition的offset,从指定的offset卡开始,未指定的从group_offset开始 Long specificOffset = specificStartupOffsets.get(seedPartition); if (specificOffset != null) { // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); } else { // default to group offset behaviour if the user-provided specific offsets // do not contain a value for this partition //对应的startupMode也存储到 subscribedPartitionsToStartOffsets中subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); } } break; case TIMESTAMP: if (startupOffsetsTimestamp == null) { throw new IllegalStateException( "Startup mode for the consumer set to "   StartupMode.TIMESTAMP   ", but no startup timestamp was specified."); } for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet { subscribedPartitionsToStartOffsets.put( partitionToOffset.getKey(), (partitionToOffset.getValue() == null) // if an offset cannot be retrieved for a partition with the given timestamp, // we default to using the latest offset for the partition ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct : partitionToOffset.getValue; } break; default: //默认GROUP_OFFSETS for (KafkaTopicPartition seedPartition : allPartitions) { subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel; } } if (!subscribedPartitionsToStartOffsets.isEmpty { switch (startupMode) { case EARLIEST: LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet; break; case LATEST: LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet; break; case TIMESTAMP: LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), startupOffsetsTimestamp, subscribedPartitionsToStartOffsets.keySet; break; case SPECIFIC_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), specificStartupOffsets, subscribedPartitionsToStartOffsets.keySet; List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size; for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet { if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey; } } if (partitionsDefaultedToGroupOffsets.size { LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"   "; their startup offsets will be defaulted to their committed group offsets in Kafka.", getRuntimeContext().getIndexOfThisSubtask(), partitionsDefaultedToGroupOffsets.size(), partitionsDefaultedToGroupOffsets); } break; case GROUP_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet; } } else { LOG.info("Consumer subtask {} initially has no partitions to read from.", getRuntimeContext().getIndexOfThisSubtask; } }
为什么要写那一个小工具
  • 在前头的篇章 卡夫卡重新恢复设置花费的Offset 介绍过能够动用librdkafka 来写一个复位offset的小工具;
  • librdkafka有个没分外,在脚下的版本里小编限制了付出最先的offset, 能够看这几个issue: Allow re-Committing offsets;
  • 当kafka集群里有一台broker机器坏掉无法修复,对于贰个未有复本的topic, 针对那台坏掉的broker上的partition, 将不能够继续提交offset, 须要停掉consumer, 重新载入参数offset,然后再重启consumer;
  • 借使线上有多量那样的topic和相应的consumer, 重启全数consumer不是一个好的方法 :(

1. 创建topic “test”

 protected AbstractFetcher( SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); this.checkpointLock = sourceContext.getCheckpointLock(); this.userCodeClassLoader = checkNotNull(userCodeClassLoader); this.useMetrics = useMetrics; this.consumerMetricGroup = checkNotNull(consumerMetricGroup); this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP); this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP); // figure out what we watermark mode we will be using this.watermarksPeriodic = watermarksPeriodic; this.watermarksPunctuated = watermarksPunctuated; if (watermarksPeriodic == null) { if (watermarksPunctuated == null) { // simple case, no watermarks involved timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; } else { timestampWatermarkMode = PUNCTUATED_WATERMARKS; } } else { if (watermarksPunctuated == null) { timestampWatermarkMode = PERIODIC_WATERMARKS; } else { throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks"); } } this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); // initialize subscribed partition states with seed partitions,根据有无timestamp / watermark //subscribedPartitionStates 持有了List<KafkaTopicPartitionState<KPH>>,KafkaTopicPartitionState包括kafkaTopicPartition offset等信息 this.subscribedPartitionStates = createPartitionStateHolders( seedPartitionsWithInitialOffsets, timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader); // check that all seed partition states have a defined offset //无论是从checkpoint中恢复也好,还是从kafkaConsumer.set...设置也好都需要有initial offset for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { if (!partitionState.isOffsetDefined { throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets."); } } // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue //到目前为止consumer并未指定partition for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) { unassignedPartitionsQueue.add(partition); } // register metrics for the initial seed partitions if (useMetrics) { registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates); } // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { @SuppressWarnings("unchecked") PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter( subscribedPartitionStates, sourceContext, processingTimeProvider, autoWatermarkInterval); periodicEmitter.start(); } }
一发改良:
  • 这几个工具只针对多个topic, 二个group, 由于大家已知是哪台broker坏掉, 由此大家得以扫描出装有反常的topic的partition和group, 均自动完结offset的重新设置;

test:0:22

createFetcher传入了刚刚的subscribedPartitionsToStartOffsets,继续往下走,在开创卡夫卡Fetcher对象的时候,作为构造函数的,最终传到了AbstractFetcher构造器

7. 乘除内定consumer group在__consumer_offsets topic中分区新闻

......try { //hasAssignedPartitions default false //当发现新的partition的时候,会add到unassignedPartitionsQueue和subscribedPartitionsToStartOffsets if (hasAssignedPartitions) { newPartitions = unassignedPartitionsQueue.pollBatch(); } else { // if no assigned partitions block until we get at least one // instead of hot spinning this loop. We rely on a fact that // unassignedPartitionsQueue will be closed on a shutdown, so // we don't block indefinitely newPartitions = unassignedPartitionsQueue.getBatchBlocking(); }//由于unassignedPartitionsQueue是有数据的,所以newPartitions != null 为true,会执行reassignPartitions方法 if (newPartitions != null) { reassignPartitions(newPartitions); } } catch (AbortedReassignmentException e) { continue; }......

void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception { if (newPartitions.size { return; } hasAssignedPartitions = true; boolean reassignmentStarted = false; // since the reassignment may introduce several Kafka blocking calls that cannot be interrupted, // the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown() // until the reassignment is complete. final KafkaConsumer<byte[], byte[]> consumerTmp; synchronized (consumerReassignmentLock) {//将consumer的引用赋值给consumerTmp consumerTmp = this.consumer; this.consumer = null; } final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>(); try {/* 之所有会有newPartition和oldPartition是因为当我们配置了KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,每个固定时间会判断是否新加了partition,如果新加了,会将新加的partition添加到unassignedPartitionsQueue中*/ for (TopicPartition oldPartition : consumerTmp.assignment { oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition)); } final List<TopicPartition> newPartitionAssignments = new ArrayList<>(newPartitions.size()   oldPartitionAssignmentsToPosition.size; newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet; newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions)); // reassign with the new partitions consumerTmp.assign(newPartitionAssignments); reassignmentStarted = true; // old partitions should be seeked to their previous position for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet { consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue; } // offsets in the state of new partitions may still be placeholder sentinel values if we are: //  starting fresh, //  checkpoint / savepoint state we were restored with had not completely // been replaced with actual offset values yet, or //  the partition was newly discovered after startup; // replace those with actual offsets, according to what the sentinel value represent. //kafka中配置关于offset的参数是不起作用的,还是依赖于startupMode//根据getOffset的类型,consumer指定开始消费的offset,而offset的类型呢,我们知道来源于startupMode for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) { if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle; newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle; } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle; newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle; } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { // the KafkaConsumer by default will automatically seek the consumer position // to the committed group offset, so we do not need to do it. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle; } else { consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset; } } } catch (WakeupException e) { // a WakeupException may be thrown if the consumer was invoked wakeup() // before it was isolated for the reassignment. In this case, we abort the // reassignment and just re-expose the original consumer. synchronized (consumerReassignmentLock) { this.consumer = consumerTmp; // if reassignment had already started and affected the consumer, // we do a full roll back so that it is as if it was left untouched if (reassignmentStarted) { this.consumer.assign(new ArrayList<>(oldPartitionAssignmentsToPosition.keySet; for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet { this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue; } } // no need to restore the wakeup state in this case, // since only the last wakeup call is effective anyways hasBufferedWakeup = false; // re-add all new partitions back to the unassigned partitions queue to be picked up again for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } // this signals the main fetch loop to continue through the loop throw new AbortedReassignmentException(); } } // reassignment complete; expose the reassigned consumer synchronized (consumerReassignmentLock) { this.consumer = consumerTmp; // restore wakeup state for the consumer if necessary if (hasBufferedWakeup) { this.consumer.wakeup(); hasBufferedWakeup = false; } } }

6. 查询__consumer_offsets topic全数内容

接下来从AbstractFetch的子类卡夫卡Fetch的构造器大家得以掌握,unassignedPartitionsQueue又传递给了卡夫卡ConsumerThread

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

前边一向有个问号,固然consumer.setStartFromLatest()以致kafkaProperties.put("auto.offset.reset", "earliest")同不经常间设有,终归哪二个会起成效,答案自然是consumer.setStartFromLatest(),为何呢?我们一齐来看一下

0.11.0.0版本以前

@Override //入口方法 start a source public void run(SourceContext<T> sourceContext) throws Exception { if (subscribedPartitionsToStartOffsets == null) { throw new Exception("The partitions were not set for the consumer"); } // initialize commit metrics and default offset callback method this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER); this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER); this.offsetCommitCallback = new KafkaCommitCallback() { @Override public void onSuccess() { successfulCommits.inc(); } @Override public void onException(Throwable cause) { LOG.warn("Async Kafka commit failed.", cause); failedCommits.inc(); } }; // mark the subtask as temporarily idle if there are no initial seed partitions; // once this subtask discovers some partitions and starts collecting records, the subtask's // status will automatically be triggered back to be active. if (subscribedPartitionsToStartOffsets.isEmpty { sourceContext.markAsTemporarilyIdle(); } // from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if  { return; } // depending on whether we were restored with the current state version , // remaining logic branches off into 2 paths: // 1) New state - partition discovery loop executed as separate thread, with this // thread running the main fetcher loop // 2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher.runFetchLoop(); } else { runWithPartitionDiscovery(); } }

  okay,写到此你应当早已知道怎么询问__consumer_offsets topic的从头到尾的经过了呢。希望本文对您有着援助。(卡夫卡当然还提供了Java APIs用于查询,具体选取办法不在此赘述了,风野趣的能够看这里。)

public KafkaFetcher( SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); this.consumerThread = new KafkaConsumerThread( LOG, handover, kafkaProperties, unassignedPartitionsQueue, getFetcherName()   " for "   taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup); }

3. 评释音信生产成功

  由于暗许未有一些名key,所以基于round-robin格局,音讯散布到分歧的分区上。 (本例中生产了64条信息)

...
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
 ...
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer

0.11.0.0事先版本

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter"

  威名赫赫,由于Zookeeper并不切合多量的再三写入操作,新版卡夫卡已引入将consumer的位移音信保存在卡夫卡内部的topic中,即__consumer_offsets topic,并且暗中同意提供了kafka_consumer_groups.sh脚本供顾客查看consumer音信。

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

 

test:1:21

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test --time -1
Math.abs(groupID.hashCode()) % numPartitions

出口: console-consumer-46965  (记住那么些id!)

2. 用到kafka-console-producer.sh脚本生产新闻

 

此时就用到了第5步获取的group.id(本例中是console-consumer-46965)。Kafka会采纳上面公式总结该group位移保存在__consumer_offsets的哪个分区上:

Kafka(0.9.0.1) Offset重置工具。test:2:21

进而在本例中,对应的分区=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分区11保留了那么些consumer group的位移消息,下边让我们作证一下。

4. 创建七个console consumer group

0.11.0.0本子之后(含)

上面是出口结果:

Kafka(0.9.0.1) Offset重置工具。  上图可以预知,该consumer group果然保存在分区11上,且位移音信都是对的(这里的位移音信是已开支的移位,严刻来讲不是第3步中的位移。由于自家的consumer已经费用完了富有的新闻,所以这边的活动与第3步中的位移一样)。别的,可以看看__consumer_offsets topic的每二十八日志项的格式都以:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

默许情状下__consumer_offsets有四十六个分区,假若您的系统中consumer group也非常多的话,那么那些命令的出口结果会成千上万。

0.11.0.0随后版本(含)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning --new-consumer

结果输出评释64条音信全体生育成功!

5. 拿走该consumer group的group id(后边必要依赖该id查询它的位移消息)

小心:运转上边发号施令前先要在consumer.properties中设置exclude.internal.topics=false

版权声明:本文由彩民之家高手论坛发布于编程技术,转载请注明出处:Kafka(0.9.0.1) Offset重置工具