Kafka安装部署
前置:jdk/zookeeper安装部署,并能正常启动。
#文件解压
tar zxvf kafka_2.11-1.0.0.tgz -C ../servers/
vim /export/servers/kafka_2.11-1.0.0/conf/server.properties
#(每个节点不能相同)
Broker.id=0
log.dirs=/export/servers/kafka_2.11-1.0.0/logs/
zookeeper.connect=node01:2181,node02:2181,node03:2181
delete.topic.enable=true
host.name=node01
#多节点复制(复制后修改配置)
scp -r kafka_2.11-1.0.0 node02:/$PWD scp -r kafka_2.11-1.0.0 node03:/$PWD
#多节点启动kafka
#启动zookeeper zkstart.sh(自己编写的脚本) 启动kafka(到每个节点启动)
node01: nohup ./bin/kafka-server-start.sh config/server.properties &
node02: nohup ./bin/kafka-server-start.sh config/server.properties &
node03: nohup ./bin/kafka-server-start.sh config/server.properties &
#kafka集群的操作
#创建topic
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic 18BD34
#查询topic
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
#模拟成产者,生产数据
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic 18BD34
#--broker-list 表示存储数据的服务器 模拟消费者,消费数据
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic 18BD34
#--from-beginning
#--from-beginning 表示从头开始消费
#--zookeeper 作用是记录数据消费到的位置(数据消费到了哪里/第几条)
//StreamAPI
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
public class StreamAPI{
public static void main(String[] args) {
// Properties props = new Properties();
// props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
// props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
// props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// KStreamBuilder builder = new KStreamBuilder();
// builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
// KafkaStreams streams = new KafkaStreams(builder, props);
// streams.start();
Properties props = new Properties();
//设置程序的唯一标识
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
//设置kafka集群
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
//设置序列化与反序列化
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//实例一个计算逻辑
StreamsBuilder streamsBuilder = new StreamsBuilder();
//设置计算逻辑 stream:读取 to:写入
streamsBuilder.stream("demo01").mapValues(line->line.toString().toUpperCase()).to("demo02");
//构建Topology对象(拓扑,流程)
final Topology topology = streamsBuilder.build();
//实例 kafka流
KafkaStreams streams = new KafkaStreams(topology, props);
//启动流计算
streams.start();
}
}
//Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
//kafka集群配置
Properties props = new Properties();
props.put(bootstrap.servers, node01:9092,node02:9092,node03:9092);
//消息确认机制
props.put(acks, all);
//重试机制
props.put(retries, 2);
//批量发送的大小
props.put(batch.size, 16384);
//消息延迟
props.put(linger.ms, 1);
//批量的缓冲区大小
props.put(buffer.memory, 33554432);
//kafka数据中key value的序列化
props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);
props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);
KafkaProducer<object object=""> kafkaProducer = new KafkaProducer(props);
for (int i = 0; i (18BD-10, 2, rua, test+i);
kafkaProducer.send(producerRecord);
}
kafkaProducer.close();
}
}
//Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class Consumersss {
public static void main(String[] args) {
//添加配置
Properties props = new Properties();
props.put(bootstrap.servers, node01:9092,node02:9092,node03:9092);
//消费组
props.put(group.id, test);
//以下代码 消费者手动提交offset值
props.put(enable.auto.commit, true);
props.put(auto.commit.interval.ms, 100);
//设置key value序列化
props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
props.put(auto.offset.reset, earliest);
KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
//指定分区 topic
TopicPartition topicPartition0 = new TopicPartition(18BD-50, 0);
TopicPartition topicPartition1 = new TopicPartition(18BD-50, 2);
kafkaConsumer.assign(Arrays.asList(topicPartition0, topicPartition1));
//指定offerset消费
//kafkaConsumer.seek(topicPartition0, 0);
kafkaConsumer.seek(topicPartition1, 10);
while (true) {
ConsumerRecords<string string=""> consumerRecords = kafkaConsumer.poll(100);
for (ConsumerRecord<string string=""> consumerRecord : consumerRecords) {
System.out.println(分区: + consumerRecord.partition());
System.out.println(偏移量: + consumerRecord.offset());
System.out.println(key: + consumerRecord.key());
System.out.println(value: + consumerRecord.value());
System.out.println(-------------------------------);
}
//手动提交offset
kafkaConsumer.commitSync();
}
}
}
//auto.offset.reset配置说明
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
#Kafka集群操作命令
#1、创建topic
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test
## 2、查看主题命令
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
## 3、生产者生产数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
## 4、消费者消费数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
## 5、运行describe topics命令 查看topic详细信息
cd /export/servers/kafka_2.11-1.0.0 bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
## 6、增加topic分区数
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
## 7、增加配置
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
## 8、删除配置
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
## 9、删除topic
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
//Kafak生产者参数配置参考:
Properties props = new Properties();
//kafka服务器地址
props.put(bootstrap.servers, node01:9092)
//消息确认机制
props.put(acks, all);
//重试机制
props.put(retries, 0);
//批量发送的大小
props.put(batch.size, 16384);
//消息延迟
props.put(linger.ms, 1);
//批量的缓冲区大小
props.put(buffer.memory, 33554432);
props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);
//Kafak消费者参数配置参考:
Properties props = new Properties();
//指定kafka服务器
props.put(bootstrap.servers, hadoop-01:9092);
//消费组
props.put(group.id, test);
//以下两行代码 ---消费者自动提交offset值
props.put(enable.auto.commit, true);
//自动提交的周期
props.put(auto.commit.interval.ms, 1000);
props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
auto.offset.reset
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
文章评论