文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

kafka+storm+hbase如何实现计算WordCount

2023-06-04 06:45

关注

这篇文章主要介绍了kafka+storm+hbase如何实现计算WordCount,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

kafka+storm+hbase实现计算WordCount。

(1)表名:wc

(2)列族:result

(3)RowKey:word

(4)Field:count

1、解决:

1)第一步:首先准备kafkastormhbase相关jar包。依赖如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com</groupId>

  <artifactId>kafkaSpout</artifactId>

  <version>0.0.1-SNAPSHOT</version>

   

    <dependencies>

        <dependency>

            <groupId>org.apache.storm</groupId>

            <artifactId>storm-core</artifactId>

            <version>0.9.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.storm</groupId>

            <artifactId>storm-kafka</artifactId>

            <version>0.9.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.1.1</version>

            <exclusions>

                <exclusion>

                    <groupId>org.apache.zookeeper</groupId>

                    <artifactId>zookeeper</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>log4j</groupId>

                    <artifactId>log4j</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>0.99.2</version>

            <exclusions>

                <exclusion>

                    <groupId>org.slf4j</groupId>

                    <artifactId>slf4j-log4j12</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>org.apache.zookeeper</groupId>

                    <artifactId>zookeeper</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

         

       <dependency>

 

         <groupId>com.google.protobuf</groupId>

 

         <artifactId>protobuf-java</artifactId>

 

         <version>2.5.0</version>

 

        </dependency>

 

        <dependency>

            <groupId>org.apache.curator</groupId>

            <artifactId>curator-framework</artifactId>

            <version>2.5.0</version>

            <exclusions>

                <exclusion>

                    <groupId>log4j</groupId>

                    <artifactId>log4j</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>org.slf4j</groupId>

                    <artifactId>slf4j-log4j12</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

                                                                              

           <dependency>

            <groupId>jdk.tools</groupId>

            <artifactId>jdk.tools</artifactId>

            <version>1.7</version>

            <scope>system</scope>

            <systemPath>C:\Program Files\Java\jdk1.7.0_51\lib\tools.jar</systemPath>

        </dependency>    

         

    </dependencies>

  

    <repositories>

        <repository>

            <id>central</id>

            <url>http://repo1.maven.org/maven2/</url>

            <snapshots>

                <enabled>false</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>clojars</id>

            <url>https://clojars.org/repo/</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>scala-tools</id>

            <url>http://scala-tools.org/repo-releases</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>conjars</id>

            <url>http://conjars.org/repo/</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

    </repositories>

 

    <build>

        <plugins>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.1</version>

                <configuration>

                    <source>1.6</source>

                    <target>1.6</target>

                    <encoding>UTF-8</encoding>

                    <showDeprecation>true</showDeprecation>

                    <showWarnings>true</showWarnings>

                </configuration>

            </plugin>

            <plugin>

                <artifactId>maven-assembly-plugin</artifactId>

                <configuration>

                    <descriptorRefs>

                        <descriptorRef>jar-with-dependencies</descriptorRef>

                    </descriptorRefs>

                    <archive>

                        <manifest>

                            <mainClass></mainClass>

                        </manifest>

                    </archive>

                </configuration>

                <executions>

                    <execution>

                        <id>make-assembly</id>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

</project>

 

(2)kafka发来的数据通过levelSplitbolt进行分割处理,然后再发送到下一个Bolt中。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

package com.kafka.spout;

 

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelSplit extends BaseBasicBolt {

  

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String words = tuple.getString(0).toString();//the cow jumped over the moon

        String []va=words.split(" ");

        for(String word : va)

        {

            collector.emit(new Values(word));

        }

         

    }

    

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }

 

}

(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,然后发送到hbase(Bolt)中。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

package com.kafka.spout;

 

import java.util.HashMap;

import java.util.Map;

import java.util.Map.Entry;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelCount extends BaseBasicBolt {

    Map<String, Integer> counts = new HashMap<String, Integer>();

 

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        // TODO Auto-generated method stub

        String word = tuple.getString(0);

        Integer count = counts.get(word);

        if (count == null)

            count = 0;

        count++;

        counts.put(word, count);

 

        for (Entry<String, Integer> e : counts.entrySet()) {

            //sum += e.getValue();

            System.out.println(e.getKey()

                                "----------->" +e.getValue());

        }

        collector.emit(new Values(word, count));     

    }

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        // TODO Auto-generated method stub

         declarer.declare(new Fields("word""count"));

    }

}

(4)准备连接kafkahbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

package com.kafka.spout;

  

import java.util.HashMap;

import java.util.Map;

 

import com.google.common.collect.Maps;

 

//import org.apache.storm.guava.collect.Maps;

  

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;

   

public class StormKafkaTopo {

    public static void main(String[] args) {

                  

        BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd""/storm""kafkaspout");

        Config conf = new Config();  

        spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());   

         

        SimpleHBaseMapper mapper = new SimpleHBaseMapper();

        mapper.withColumnFamily("result");

        mapper.withColumnFields(new Fields("count"));

        mapper.withRowKeyField("word");

         

        Map<String, Object> map = Maps.newTreeMap();

        map.put("hbase.rootdir""hdfs://zeb:9000/hbase");

        map.put("hbase.zookeeper.quorum""zeb:2181,yjd:2181,ylh:2181");

         

        // hbase-bolt

        HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");

 

        conf.setDebug(true);

        conf.put("hbase.conf", map);

          

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout"new KafkaSpout(spoutConfig));

        builder.setBolt("split"new LevelSplit(), 1).shuffleGrouping("spout");

        builder.setBolt("count"new LevelCount(), 1).fieldsGrouping("split"new Fields("word"));

        builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");

         

        if(args != null && args.length > 0) {

            //提交到集群运行

            try {

                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

            catch (AlreadyAliveException e) {

                e.printStackTrace();

            catch (InvalidTopologyException e) {

                e.printStackTrace();

            }

        else {

            //本地模式运行

            LocalCluster cluster = new LocalCluster();

            cluster.submitTopology("Topotest1121", conf, builder.createTopology());

            Utils.sleep(1000000);

            cluster.killTopology("Topotest1121");

            cluster.shutdown();

        }          

    }

}

(5)在kafka端用控制台生产数据,如下:

kafka+storm+hbase如何实现计算WordCount

2、运行结果截图:

 kafka+storm+hbase如何实现计算WordCount

3、遇到的问题:

(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,如下:

 kafka+storm+hbase如何实现计算WordCount

解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。

(2)发生了错误2,如下:

 kafka+storm+hbase如何实现计算WordCount

解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。

感谢你能够认真阅读完这篇文章,希望小编分享的“kafka+storm+hbase如何实现计算WordCount”这篇文章对大家有帮助,同时也希望大家多多支持编程网,关注编程网行业资讯频道,更多相关知识等着你来学习!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯