加入收藏 | 设为首页 | 会员中心 | 我要投稿 51站长网 (https://www.51zhanzhang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
站内搜索:
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

Kafka之消费与激情

发布时间:2021-05-29 19:13:52 所属栏目:Linux 来源:互联网
导读:1、Kafka消费 首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的

                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {  

                        client.pollNoWakeup();                    }                    return this.interceptors.onConsume(new ConsumerRecords<>(records));  

                }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd - metadataEnd;            } while (elapsedTime < timeoutMs);  

            return ConsumerRecords.empty();  

        } finally {  

            release();        }    }  

上述代码中有个方法pollForFetches,它的实现逻辑如下:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {  

        final long startMs = time.milliseconds();  

        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);  

        // if data is available already, return it immediately  

        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();  

        if (!records.isEmpty()) {  

            return records;  

        }  

        // send any new fetches (won't resend pending fetches)  

        fetcher.sendFetches();  

(编辑:ASP站长)

【免责声明】本站内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

相关内容
未处理完善
    无相关信息
未处理完善