搭建Zookeeper服务

1.1 安装JDK和单机版zookeeper

  1. 首先为服务器安装JDK环境
  2. 安装zookeeper安装包并解压
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar -xvf zookeeper-3.4.10.tar.gz
  1. 创建data和log目录
  2. 创建myid文件。

    编辑myid文件,并在对应的IP的机器上输入对应的编号。如在第一台zookeeper上,我们给该myid文件内容指定就是1。如果只在单点上进行安装配置,那么只有一个server,后面讲集群的时候会有多态server故会有2,3,4…等等。 
    
  3. 复制并修改zookeeper的配置文件
    进入zookeeper-3.4.6/conf目录下,将zoo_sample.cfg文件复制一份取名zoo.cfg
    zoo.cfg这是启动时默认约定读取的

    1是指一个数字,与前面创建的myid对应即可,标志这是第几台机器,h1是我配置的映射名,大家可以直接将h1改为自己的ip,如server.1=192.168.2.101:2888:3888;

    Hosts映射配置:vi /etc/hosts

    2888 表示的是这个服务器与集群中的 Leader 服务器交换信息的端,2888端口简单来说就是zookeeper服务之间的通信端口;
    3888端口是zookeeper与其他应用程序通信的端口

  4. 增加环境变量

    export ZOOKEEPER_HOME=/home/parallels/Desktop/kafka/zookeeper-3.4.10 export PATH=$ZOOKEEPER_HOME/bin:$PATH
  1. 开启服务

    ./zkServer.sh start

1.2 安装集群模式的Zookeeper

服务器1 IP:10.211.55.4
服务器2 IP:10.211.55.8
服务器3 IP:10.211.55.9
  1. 修改host配置
![](http://od4y7wzt7.bkt.clouddn.com/15193051341907.jpg)
  1. 配置配置文件
    在每台服务器上配置相同的配置文件

  2. 启动后查看状态

    ./zkServer.sh status

1.3 JavaDemo

//pom
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class demo01 {
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(5);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Map map=new HashMap();
map.put("10.211.55.4","2181");
map.put("10.211.55.8","2181");
map.put("10.211.55.9","2181");
String s="10.211.55.4:2181,10.211.55.8:2181,10.211.55.9:2181";
CuratorFramework client =
CuratorFrameworkFactory.newClient(
s,
5000,
3000,
retryPolicy);
client.start();
// client.create().forPath("/sharedlock");
//client.delete().forPath("/idea");
// client.setData().forPath("/idea","data".getBytes());
//InterProcessMutex sharedLock = new InterProcessMutex(client, "/sharedlock");
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.submit(new MyLock("client" + i, client, latch));
}
}
static class MyLock implements Runnable {
private String name;
private CuratorFramework client;
private CountDownLatch latch;
public MyLock(String name, CuratorFramework client, CountDownLatch latch) {
this.name = name;
this.client = client;
this.latch = latch;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public void run() {
InterProcessMutex lock = new InterProcessMutex(client,
"/sharedlock");
try {
if (lock.acquire(120, TimeUnit.SECONDS)) {
try {
System.out.println("----------" + this.name
+ "获得资源----------");
System.out.println("----------" + this.name
+ "正在处理资源----------");
Thread.sleep(1 * 1000);
System.out.println("----------" + this.name
+ "资源使用完毕----------");
latch.countDown();
} finally {
lock.release();
System.out.println("----------" + this.name
+ "释放----------");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

2.1 搭建Kafka

  1. 安装Kafka

    wget https://archive.apache.org/dist/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
  2. 修改配置文件

    1. 修改broker.id
    2. host.name
    3. listeners a
    4. dvertised.listeners
    5. brokerid可以为110.111.112
  3. 在三个节点上启动Kafka

    ./kafka-server-start.sh config/server.properties
  4. 创建主题

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
  1. 查看主题详细

    ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
  2. 在Zookeeper上查看Kafka集群

    [zk: localhost:2181(CONNECTED) 5] ls /
    [admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 6] ls /brokers ----> 查看注册在zk内的kafka
    [topics, ids]
    [zk: localhost:2181(CONNECTED) 7] ls /brokers/ids
    [112, 110, 111]
    [zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112
    []
    [zk: localhost:2181(CONNECTED) 9] ls /brokers/topics
    [test]
    [zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test
    [partitions]
    [zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions
    [2, 1, 0]
    [zk: localhost:2181(CONNECTED) 12]
  1. 测试消息通信

    启动生产者和消费者
    # 生产者
    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    #消费者
    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_api

2.2 kafka的JavaAPI

  1. 创建maven项目。pom文件中引入kafka依赖

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.1.0</version>
    </dependency>
  2. 创建KafkaProducerTest类

    import java.util.Properties;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import java.util.Properties;
    public class KafkaProducerTest implements Runnable {
    private String topic;
    private Producer<Integer, String> producer;
    public KafkaProducerTest(String topic) {
    this.topic = topic;
    Properties properties = new Properties();
    properties.put("metadata.broker.list", "10.211.55.4:9092,10.211.55.8:9093,10.211.55.9:9094");
    properties.put("serializer.class", "kafka.serializer.StringEncoder");
    producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }
    public void run() {
    int messageNo = 1;
    while(true) {
    String message = "message_" + messageNo;
    System.out.println("Send:" + message);
    producer.send(new KeyedMessage<Integer, String>(topic, message));
    messageNo ++;
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    public static void main(String[] args) {
    KafkaProducerTest test = new KafkaProducerTest("tests");
    Thread a=new Thread(test);
    a.start();
    }
    }
  1. 创建KafkaConsumerTest类

    import java.util.Arrays;
    import java.util.Properties;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    public class KafkaConsumerTest implements Runnable {
    private String topic;
    public KafkaConsumerTest(String topic) {
    this.topic = topic;
    }
    private ConsumerConnector createConsumer() {
    Properties properties = new Properties();
    properties.setProperty("zookeeper.connect", "10.211.55.4:2181,10.211.55.8:2181,10.211.55.9:2181");
    properties.setProperty("group.id", "testGroup");
    return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
    public void run() {
    // 创建Consumer
    ConsumerConnector consumer = createConsumer();
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic,1);
    Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
    // 获取每次接受到的数据
    KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    // 不停地从stream中读取最新接收到的数据
    while(iterator.hasNext()){
    String message = new String(iterator.next().message());
    System.out.println("message:" + message);
    }
    }
    public static void main(String[] args) {
    KafkaConsumerTest test = new KafkaConsumerTest("tests");
    Thread a=new Thread(test);
    a.start();
    }
    }