Kafka安装部署与代码

Kafka安装部署

前置:jdk/zookeeper安装部署,并能正常启动。

#文件解压
tar zxvf kafka_2.11-1.0.0.tgz  -C ../servers/
vim /export/servers/kafka_2.11-1.0.0/conf/server.properties
#(每个节点不能相同)
Broker.id=0
log.dirs=/export/servers/kafka_2.11-1.0.0/logs/
zookeeper.connect=node01:2181,node02:2181,node03:2181
delete.topic.enable=true
host.name=node01
#多节点复制(复制后修改配置)
scp -r kafka_2.11-1.0.0 node02:/$PWD scp -r kafka_2.11-1.0.0 node03:/$PWD
#多节点启动kafka
#启动zookeeper zkstart.sh(自己编写的脚本) 启动kafka(到每个节点启动) 
node01: nohup ./bin/kafka-server-start.sh  config/server.properties &
node02: nohup ./bin/kafka-server-start.sh  config/server.properties &
node03: nohup ./bin/kafka-server-start.sh  config/server.properties &
#kafka集群的操作 
#创建topic
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic 18BD34
#查询topic
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
#模拟成产者,生产数据
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic 18BD34
#--broker-list 表示存储数据的服务器 模拟消费者,消费数据
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic 18BD34
#--from-beginning
#--from-beginning 表示从头开始消费
#--zookeeper 作用是记录数据消费到的位置(数据消费到了哪里/第几条)

Kafka安装部署与代码-01
Kafka安装部署与代码-02

//StreamAPI
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;

public class StreamAPI{

    public static void main(String[] args) {
//        Properties props = new Properties();
//        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
//        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
//        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//        KStreamBuilder builder = new KStreamBuilder();
//        builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
//        KafkaStreams streams = new KafkaStreams(builder, props);
//        streams.start();

        Properties props = new Properties();
        //设置程序的唯一标识
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        //设置kafka集群
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        //设置序列化与反序列化
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //实例一个计算逻辑
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        //设置计算逻辑   stream:读取                                                         to:写入
        streamsBuilder.stream("demo01").mapValues(line->line.toString().toUpperCase()).to("demo02");
        //构建Topology对象(拓扑,流程)
        final Topology topology = streamsBuilder.build();
        //实例 kafka流
        KafkaStreams streams = new KafkaStreams(topology, props);
        //启动流计算
        streams.start();
    }
}
//Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class Producer {
    public static void main(String[] args) {
        //kafka集群配置
        Properties props = new Properties();
        props.put(bootstrap.servers, node01:9092,node02:9092,node03:9092);
        //消息确认机制
        props.put(acks, all);
        //重试机制
        props.put(retries, 2);
        //批量发送的大小
        props.put(batch.size, 16384);
        //消息延迟
        props.put(linger.ms, 1);
        //批量的缓冲区大小
        props.put(buffer.memory, 33554432);
        //kafka数据中key  value的序列化
        props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);
        props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);
        KafkaProducer<object object=""> kafkaProducer = new KafkaProducer(props);
        for (int i = 0; i (18BD-10, 2, rua, test+i);
            kafkaProducer.send(producerRecord);
        }
        kafkaProducer.close();
    }
}
//Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;

public class Consumersss {
    public static void main(String[] args) {
        //添加配置
        Properties props = new Properties();
        props.put(bootstrap.servers, node01:9092,node02:9092,node03:9092);
        //消费组
        props.put(group.id, test);
        //以下代码 消费者手动提交offset值
        props.put(enable.auto.commit, true);
        props.put(auto.commit.interval.ms, 100);
        //设置key value序列化
        props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
        props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);

        props.put(auto.offset.reset, earliest);

        KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
        //指定分区 topic
        TopicPartition topicPartition0 = new TopicPartition(18BD-50, 0);
        TopicPartition topicPartition1 = new TopicPartition(18BD-50, 2);
        kafkaConsumer.assign(Arrays.asList(topicPartition0, topicPartition1));
        //指定offerset消费
        //kafkaConsumer.seek(topicPartition0, 0);
        kafkaConsumer.seek(topicPartition1, 10);

        while (true) {
            ConsumerRecords<string string=""> consumerRecords = kafkaConsumer.poll(100);
            for (ConsumerRecord<string string=""> consumerRecord : consumerRecords) {
                System.out.println(分区: + consumerRecord.partition());
                System.out.println(偏移量: + consumerRecord.offset());
                System.out.println(key: + consumerRecord.key());
                System.out.println(value: + consumerRecord.value());
                System.out.println(-------------------------------);
            }
        //手动提交offset
        kafkaConsumer.commitSync();
        }

    }
}

//auto.offset.reset配置说明
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
#Kafka集群操作命令
#1、创建topic
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test
## 2、查看主题命令
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
## 3、生产者生产数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
## 4、消费者消费数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
## 5、运行describe topics命令 查看topic详细信息
cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
## 6、增加topic分区数
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
## 7、增加配置
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
## 8、删除配置
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
## 9、删除topic
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
//Kafak生产者参数配置参考:
Properties props = new Properties();
//kafka服务器地址
props.put(bootstrap.servers, node01:9092)
//消息确认机制
props.put(acks, all);
//重试机制
props.put(retries, 0);
//批量发送的大小
props.put(batch.size, 16384); 
//消息延迟
props.put(linger.ms, 1);
//批量的缓冲区大小
props.put(buffer.memory, 33554432);
props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); 

//Kafak消费者参数配置参考:
Properties props = new Properties();
//指定kafka服务器
props.put(bootstrap.servers, hadoop-01:9092);
//消费组
props.put(group.id, test);
//以下两行代码 ---消费者自动提交offset值
props.put(enable.auto.commit, true);
//自动提交的周期
props.put(auto.commit.interval.ms, 1000);
props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); 

auto.offset.reset
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

已发布

分类

, ,

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注