storm第一阶段

介绍

  • 一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。被称作”实时的hadoop”.
  • 主要用在流式的数据处理,例如数据一边进来一边转换格式。hadoop是批量离线处理,storm是实时的。
  • 使用场景:实时分析、在线机器学习,持续计算,分布式RPC,ETL等等
  • storm的典型拓扑图
  • 逻辑结构跟物理结构

安装

  1. 安装一个zookeeper集群
  2. 上传storm的安装包,解压
  3. 修改配置文件storm.yaml,下面是必须填充的
    1
    2
    3
    4
    5
    6
    7
    #所使用的zookeeper集群主机
    storm.zookeeper.servers:
    - "weekend05"
    - "weekend06"
    - "weekend07"
    #nimbus所在的主机名,协调节点或者是说主节点
    nimbus.host: "weekend05"
  • 启动storm
    • 在nimbus主机上,启动nimbusnohup ./storm nimbus,启动提供web访问的进程nohup ./storm ui
    • 在supervisor主机上nohup ./storm supervisor

java小demo

  • 随机读单词装换成大写然后再加上时间

  • 模拟spouts数据的输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RandomWordSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
//模拟一些数据
String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
//不断地往下一个组件发送tuple消息
//这里面是该spout组件的核心逻辑
@Override
public void nextTuple() {
//可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去
Random random = new Random();
int index = random.nextInt(words.length);
//通过随机数拿到一个商品名
String godName = words[index];
//将商品名封装成tuple,发送消息给下一个组件
collector.emit(new Values(godName));
//每发送一个消息,休眠500ms
Utils.sleep(500);
}
//初始化方法,在spout组件实例化时调用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
//声明本spout组件发送出去的tuple中的数据的字段名
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("orignname"));
}
}
  • bolts转换成大写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class UpperBolt extends BaseBasicBolt{
//业务处理逻辑
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先获取到上一个组件传递过来的数据,数据在tuple里面
String godName = tuple.getString(0);
//将商品名转换成大写
String godName_upper = godName.toUpperCase();
//将转换完成的商品名发送出去
collector.emit(new Values(godName_upper));
}
//声明该bolt组件要发出去的tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uppername"));
}
}
  • 添加时间后缀
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SuffixBolt extends BaseBasicBolt{
FileWriter fileWriter = null;
//在bolt组件运行过程中只会被调用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//该bolt组件的核心处理逻辑
//每收到一个tuple消息,就会被调用一次
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先拿到上一个组件发送过来的商品名称
String upper_name = tuple.getString(0);
String suffix_name = upper_name + "_itisok";
//为上一个组件发送过来的商品名称添加后缀
try {
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
  • 管理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
* 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
* @author duanhaitao@itcast.cn
*
*/
public class TopoMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//将我们的spout组件设置到topology中去
//parallelism_hint :4 表示用4个excutor来执行这个组件
//setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task
builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
//将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
//.shuffleGrouping("randomspout")包含两层含义:
//1、upperbolt组件接收的tuple消息一定来自于randomspout组件
//2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping
builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
//将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
//用builder来创建一个topology
StormTopology demotop = builder.createTopology();
//配置一些topology在集群中运行时的参数
Config conf = new Config();
//这里设置的是整个demotop所占用的槽位数,也就是worker的数量
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);
//将这个topology提交给storm集群运行
StormSubmitter.submitTopology("demotopo", conf, demotop);
}
}
张冲 wechat
欢迎扫一扫上面的微信关注我,一起交流!
坚持原创技术分享,您的支持将鼓励我继续创,点击打赏!