- A+
22.4.7. 初始化Storm
这边我们在(10.10.10.21 storm_1、10.10.10.22 storm_2、10.10.10.23 storm_3)这三台部署storm。
- 到官网下载Stormtorm(apache-storm-0.9.6.zip)
- 解压到/usr/local/目录下,三台机子都执行同样的命令
1 2 |
[root@storm_1 wordcount]# unzip apache-storm-0.9.6.zip [root@storm_1 wordcount]# mv apache-storm-0.9.6 /usr/local/ |
- 设置yaml配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
[root@storm_2 wordcount]# cat /usr/local/apache-storm-0.9.6/conf/storm.yaml # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ########### These MUST be filled in for a storm configuration # storm.zookeeper.servers: # - "server1" # - "server2" # # nimbus.host: "nimbus" # # # ##### These may optionally be filled in: # ## List of custom serializations # topology.kryo.register: # - org.mycompany.MyType # - org.mycompany.MyType2: org.mycompany.MyType2Serializer # ## List of custom kryo decorators # topology.kryo.decorators: # - org.mycompany.MyDecorator # ## Locations of the drpc servers # drpc.servers: # - "server1" # - "server2" ## Metrics Consumers # topology.metrics.consumer.register: # - class: "backtype.storm.metric.LoggingMetricsConsumer" # parallelism.hint: 1 # - class: "org.mycompany.MyMetricsConsumer" # parallelism.hint: 1 # argument: # - endpoint: "metrics-collector.mycompany.org" storm.zookeeper.servers: - "storm_1" - "storm_2" - "storm_3" nimbus.host: "storm_1" storm.local.dir: "/u01/storm/status" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 |
- 创建Storm运行时目录
1 |
[root@storm_1 wordcount]# mkdir -p /u01/storm/status |
- 启动Storm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# Node1:启动 storm UI界面 [root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm ui > /dev/null 2>&1 & # Node1:启动 storm nimbus [root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm nimbus > /dev/null 2>&1 & # Node2:启动 supervisor [root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 & # Node3:启动 supervisor [root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 & # 在各个节点上运行 jps 查看服务状态 [root@storm_1 wordcount]# lps 2151 core 2097 QuorumPeerMain 3969 Jps 2191 nimbus |
- 开启web界面访问Storm UI
看到上的界面就说明我们的Storm已经部署完毕了。
22.4.1. 构建streamparse(Python Storm框架)
streamparse 是Python Storm的一个框架,他可以将python代码打包为一个jar包运行在Storm中。
官网:http://streamparse.readthedocs.io/en/master/quickstart.html。
(PS:streamparse 3 以上的拓扑已经改变。和作者沟通过他是为了让streamparse能够更好的独立运行,从而脱离storm环境。)
- 创建3机信任,分别在3台机子上都生成ssh的公钥,分别执行以下命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
[root@storm_1 ~]# ssh-keygen -t rsa Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: 1e:20:62:da:f5:fb:69:32:da:ac:09:ef:7c:35:a5:01 root@storm_3 The key's randomart image is: +--[ RSA 2048]----+ | | | E | | o o .. | | + o o .. . | |. . . S+ | | o+. | | . .... | | + ++... | | .B+o+o | +-----------------+ |
执行完上面命令后会在各个主机的 ~/.ssh/ 目录下会生成 id_rsa.pub 文件。将3台机子中的公钥都拷贝到一个文件中并且让3台机子的这个文件内容都一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# storm_1 节点 [root@storm_1 ~]# cat ~/.ssh/id_rsa.pub ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz+d5/LqyT6qDi1T8p+k4UHCkgmRqWZbG+LEtzQEjE3/Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI/C/Yu+q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH+h6Cb4UJChUR7owWKr root@storm_1 # storm_2 节点 [root@storm_2 ~]# cat ~/.ssh/id_rsa.pub ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC/n9bY6jD8Z2mkgZLO9meXAMNvDt/YJRpcTM57ko2p9Cmm4c+CgQzstBExOAciQR9ckranLj8k/GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi+xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl/GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU/Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO/uSJc0t root@storm_2 # storm_3 节点 [root@storm_3 ~]# cat ~/.ssh/id_rsa.pub ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA+a+UuT6DtVVIIl9/gPrNlRUFLy+8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS+TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ/oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy/jgpDdEQBibRucrTiQWoJ/uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw+kKEDlEZGCUuvFHLfIzV7C4PExEViml root@storm_3 # 每个节点中的 authorized_keys 文件内容 [root@storm_1 ~]# cat ~/.ssh/authorized_keys ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz+d5/LqyT6qDi1T8p+k4UHCkgmRqWZbG+LEtzQEjE3/Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI/C/Yu+q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH+h6Cb4UJChUR7owWKr root@storm_1 ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC/n9bY6jD8Z2mkgZLO9meXAMNvDt/YJRpcTM57ko2p9Cmm4c+CgQzstBExOAciQR9ckranLj8k/GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi+xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl/GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU/Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO/uSJc0t root@storm_2 ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA+a+UuT6DtVVIIl9/gPrNlRUFLy+8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS+TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ/oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy/jgpDdEQBibRucrTiQWoJ/uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw+kKEDlEZGCUuvFHLfIzV7C4PExEViml root@storm_3 |
- 在3台机子上创建config文件(3台机子都要执行)
1 |
[root@storm_1 wordcount]# touch /root/.ssh/config |
- 下载 lein 文件到 /usr/local/bin 目录中,授予可执行权限(3台机子都要执行)
1 2 3 |
[root@storm_1 wordcount]# wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein [root@storm_1 wordcount]# mv lein /usr/local/bin/ [root@storm_1 wordcount]# chmod 755 /usr/local/bin/lein |
- 安装streamparse(3台机子都要执行)
1 |
[root@storm_1 wordcount]# pip install streamparse |
- 创建storm_project 目录,并且开始一个简单的Storm项目(在storm_2上操作),这边不要再Storm启动的Nimbus节点上创建,因为到时候运行Storm项目会有端口上的冲突。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
[root@storm_2 ~]# mkdir -p /u01/storm_project [root@storm_2 ~]# cd /u01/storm_project/ [root@storm_2 storm_project]# pwd /u01/storm_project [root@storm_2 ~]# sparse quickstart wordcount Creating your wordcount streamparse project... create wordcount create wordcount/.gitignore create wordcount/config.json create wordcount/fabfile.py create wordcount/project.clj create wordcount/README.md create wordcount/src create wordcount/src/bolts/ create wordcount/src/bolts/__init__.py create wordcount/src/bolts/wordcount.py create wordcount/src/spouts/ create wordcount/src/spouts/__init__.py create wordcount/src/spouts/words.py create wordcount/topologies create wordcount/topologies/wordcount.py create wordcount/virtualenvs create wordcount/virtualenvs/wordcount.txt Done. Try running your topology locally with: cd wordcount sparse run |
- 设置json配置文件(在storm_2上操作)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
[root@storm_2 wordcount]# cat /u01/storm_project/wordcount/config.json { "library": "", "topology_specs": "topologies/", "virtualenv_specs": "virtualenvs/", "envs": { "prod": { "user": "root", "nimbus": "storm_1", "workers": [ "storm_1", "storm_2", "storm_3" ], "log": { "path": "/tmp/storm/stream/log", "file": "pystorm_{topolopy_name}_{component_name}_{task_id}_{pid}.log", "max_bytes": 1000000, "backup_count": 10, "level": "info" }, "use_ssh_for_nimbus": true, "virtualenv_root": "/tmp/storm/stream/virtualenvs" } } } |
- 创建相关目录(3个机器上都需要执行)
1 2 |
[root@storm_1 wordcount]# mkdir -p /tmp/storm/stream/log [root@storm_1 wordcount]# mkdir -p /tmp/storm/stream/virtualenvs |
- 将wordcount程序提交到Storm集群上(在storm_2上操作)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
[root@storm_2 wordcount]# pwd /u01/storm_project/wordcount [root@storm_2 wordcount]# sparse submit [storm_1] Executing task '_create_or_update_virtualenv' [storm_2] Executing task '_create_or_update_virtualenv' ... omit ... [storm_1] run: rm /tmp/streamparse_requirements-oD8qdm4We.txt [storm_3] out: [storm_3] run: rm /tmp/streamparse_requirements-5greXfqjW.txt Cleaning from prior builds... # 需要敲回车键 Creating topology Uber-JAR... # 需要敲回车键 Uber-JAR created: /u01/storm_project/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar Deploying "wordcount" topology... ssh tunnel to Nimbus storm_1:6627 established. Routing Python logging to /tmp/storm/stream/log. Running lein command to submit topology to nimbus: lein run -m streamparse.commands.submit_topology/-main topologies/wordcount.clj --option 'topology.workers=2' --option 'topology.acker.executors=2' --option 'topology.python.path="/tmp/storm/stream/virtualenvs/wordcount/bin/python"' --option 'streamparse.log.path="/tmp/storm/stream/log"' --option 'streamparse.log.max_bytes=1000000' --option 'streamparse.log.backup_count=10' --option 'streamparse.log.level="info"' WARNING: You're currently running as root; probably by accident. Press control-C to abort or Enter to continue as root. Set LEIN_ROOT to disable this warning. # 需要敲回车键 {:option {streamparse.log.level info, streamparse.log.backup_count 10, streamparse.log.max_bytes 1000000, streamparse.log.path /tmp/storm/stream/log, topology.python.path /tmp/storm/stream/virtualenvs/wordcount/bin/python, topology.acker.executors 2, topology.workers 2}, :debug false, :port 6627, :host localhost, :help false} 1604 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 1620 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar /u01/storm_project/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar to assigned location: /u01/storm/status/nimbus/inbox/stormjar-03200d7a-dec1-44a6-b0f7-e775d0227864.jar 3853 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /u01/storm/status/nimbus/inbox/stormjar-03200d7a-dec1-44a6-b0f7-e775d0227864.jar 3854 [main] INFO backtype.storm.StormSubmitter - Submitting topology wordcount in distributed mode with conf {"streamparse.log.backup_count":10,"streamparse.log.path":"\/tmp\/storm\/stream\/log","topology.python.path":"\/tmp\/storm\/stream\/virtualenvs\/wordcount\/bin\/python","topology.debug":false,"nimbus.thrift.port":6627,"topology.max.spout.pending":5000,"nimbus.host":"localhost","topology.workers":2,"topology.acker.executors":2,"streamparse.log.max_bytes":1000000,"streamparse.log.level":"info","topology.message.timeout.secs":60} 4487 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: wordcount |
如果输出类似上面的信息就算是部署完成了。
- 确认wordcount程序已经部署到了 Storm中
- 停止Storm中的wordcount程序
1 2 3 4 5 6 7 8 9 |
[root@storm_2 wordcount]# pwd /u01/storm_project/wordcount [root@storm_2 wordcount]# sparse kill -n wordcount WARNING: You're currently running as root; probably by accident. Press control-C to abort or Enter to continue as root. Set LEIN_ROOT to disable this warning. 5180 [main] INFO backtype.storm.thrift - Connecting to Nimbus at localhost:6627 Killed topology: wordcount |
出现上面信息就说明wordcount程序已经从Storm集群中停止并移除了。
22.4.9. streamparse代码编写
由于这是示例程序,我们就在之前创建好的wordcount项目中修改代码。在这里我们只需要修改spout和bolt的代码就好。
这边我们需要安装Python Kafka和Python MongoDB的相关模块,执行如下命令:
1 2 3 4 5 6 |
# 在操作系统自带的Python中安装,主要是为了使用sparse run时会调用 pip install pykafka pip install pymongo # 在streamparse Storm Python虚拟环境中安装(sparse submit) /tmp/storm/stream/virtualenvs/wordcount/bin/pip install pykafka /tmp/storm/stream/virtualenvs/wordcount/bin/pip install pymongo |
- words.py代码(spout)
words.py的功能就是不断消费kafka产生的消息,并且发送(emit)下面一个接收者(spout|bolt)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
[root@storm_2 spouts]# pwd /u01/storm_project/wordcount/src/spouts [root@storm_2 spouts]# [root@storm_2 spouts]# [root@storm_2 spouts]# cat words.py # -*- coding:utf-8 -*- from __future__ import absolute_import, print_function, unicode_literals import itertools from streamparse.spout import Spout from pykafka import KafkaClient import simplejson as json import sys reload(sys) sys.setdefaultencoding('utf-8') class WordSpout(Spout): def initialize(self, stormconf, context): # self.words = itertools.cycle(['dog', 'cat', # 'zebra', 'elephant']) client = KafkaClient(hosts="10.10.10.11:9092") topic = client.topics[b"test"] self.balanced_consumer = topic.get_balanced_consumer( consumer_group=b"test_group", auto_commit_enable=True, zookeeper_connect="storm_1:2181,storm_2:2181,storm_3:2181" ) def next_tuple(self): # word = next(self.words) # self.emit([word]) message = self.balanced_consumer.consume() # Logstash字符串转化为dict log_info = json.loads(message.value) word = log_info["message"] with open("/tmp/storm.log", "a") as f: f.write(word) self.emit([word]) |
- py代码
wordcount.py主要是实现了,接收从words.py发送的信息(json字符串),并将接收的到信息解析成转化成python的字典类型,分析数据存放到MongoDB(10.10.10.12)中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
[root@storm_2 bolts]# pwd /u01/storm_project/wordcount/src/bolts [root@storm_2 bolts]# [root@storm_2 bolts]# cat wordcount.py # -*- coding:utf-8 -*- from __future__ import absolute_import, print_function, unicode_literals from collections import Counter from streamparse.bolt import Bolt import simplejson as json from pymongo import MongoClient import sys reload(sys) sys.setdefaultencoding('utf-8') class WordCounter(Bolt): def initialize(self, conf, ctx): # self.counts = Counter() client = MongoClient(b"10.10.10.12:27017,10.10.10.12:27018,10.10.10.12:27019", replicaset="rs_12") # 获得 order_stat 数据库 self.db = client.shop def process(self, tup): # 获得从spout传过来的字符串 word = tup.values[0] # self.counts[word] += 1 # self.emit([word, self.counts[word]]) # self.log('%s: %d' % (word, self.counts[word])) # 将spout传来的字符串解析成dict order_info = json.loads(word) # 通过 kafka 传入的 user_name 查找相关用户统计信息 condition = {"user_name": order_info["user_name"]} order_stat_info = self.db.order_stat.find_one(condition) ## 如果order_stat_info无值则插入, 有值则更新 # 1、无值情况 if not order_stat_info: order_stat_info_new = { "user_name": order_info.get("user_name", "Unknow"), "order_num": 1, "total_price": order_info.get("price", 0.00), "min_order_price": order_info.get("price", 0.00), "max_order_price": order_info.get("price", 0.00), "min_order": order_info.get("order_id", 0), "max_order": order_info.get("order_id", 0), } self.db.order_stat.insert_one(order_stat_info_new) # 2、有值情况 else: min_order_price = min(order_stat_info["min_order_price"], order_info.get("price", 0.00)) max_order_price = max(order_stat_info["max_order_price"], order_info.get("price", 0.00)) min_order = order_stat_info["min_order"] max_order = order_stat_info["max_order"] # 设置 最小order id if min_order_price == order_info.get("price", 0.00): min_order = order_info.get("order_id", min_order) # 设置 最大order id if max_order_price == order_info.get("price", 0.00): max_order = order_info.get("order_id", max_order) # 构造更新的信息 order_stat_info_new = { "order_num": order_stat_info["order_num"] + 1, "total_price": order_stat_info["total_price"] + order_info.get("price", 0.00), "min_order_price": min_order_price, "max_order_price": max_order_price, "min_order": min_order, "max_order": max_order } # 跟新信息 self.db.order_stat.update_one({"_id": order_stat_info["_id"]}, {"$set": order_stat_info_new}) |
编写好上面代码之后就需要测试运行情况了。
- 运行streamparse进行测试
由于我们还不知道我们写的代码正确性,因此需要使用sparse run来记性调试,而非使用sparse submit直接提交到Storm环境中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
[root@storm_2 wordcount]# pwd /u01/storm_project/wordcount [root@storm_2 wordcount]# [root@storm_2 wordcount]# sparse run ... Omit ... 8653 [Thread-15-count-bolt] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 3719 8703 [Thread-16-word-spout] INFO backtype.storm.spout.ShellSpout - Launched subprocess with pid 3717 8706 [Thread-13-count-bolt] INFO backtype.storm.task.ShellBolt - Start checking heartbeat... 8706 [Thread-13-count-bolt] INFO backtype.storm.daemon.executor - Prepared bolt count-bolt:(3) 8708 [Thread-15-count-bolt] INFO backtype.storm.task.ShellBolt - Start checking heartbeat... 8708 [Thread-15-count-bolt] INFO backtype.storm.daemon.executor - Prepared bolt count-bolt:(4) 8708 [Thread-16-word-spout] INFO backtype.storm.daemon.executor - Opened spout word-spout:(5) 8715 [Thread-16-word-spout] INFO backtype.storm.daemon.executor - Activating spout word-spout:(5) 8715 [Thread-16-word-spout] INFO backtype.storm.spout.ShellSpout - Start checking heartbeat... |
- 向Logstash(10.10.11)监听的文件中输入相关的订单信息
1 2 3 4 5 6 7 8 9 |
echo '{"order_id":1, "price":20, "user_name":"Bob", "goods_name":"good_name2"}' > /tmp/orders.log echo '{"order_id":2, "price":120, "user_name":"Bob", "goods_name":"good_name1"}' >> /tmp/orders.log echo '{"order_id":3, "price":1120, "user_name":"Bob", "goods_name":"good_name4"}' >> /tmp/orders.log echo '{"order_id":4, "price":11120, "user_name":"Bob", "goods_name":"good_name3"}' >> /tmp/orders.log echo '{"order_id":1, "price":10, "user_name":"Tom", "goods_name":"good_name2"}' >> /tmp/orders.log echo '{"order_id":2, "price":110, "user_name":"Tom", "goods_name":"good_name1"}' >> /tmp/orders.log echo '{"order_id":3, "price":1110, "user_name":"Tom", "goods_name":"good_name4"}' >> /tmp/orders.log echo '{"order_id":4, "price":11110, "user_name":"Tom", "goods_name":"good_name3"}' >> /tmp/orders.log |
- 查看MongoDB(10.10.12)中的订单统计信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
[root@normal_12 ~]# /u01/mongodb_27018/client_mongodb.sh MongoDB shell version: 3.2.5 connecting to: 10.10.10.12:27018/test (test) 01:01:10> (test) 01:01:11> use shop switched to db shop (shop) 01:01:16> (shop) 01:22:32>db.order_stat.find() { "_id" : ObjectId("5734bba0172d290f86e2d2e4"), "total_price" : 12380, "min_order_price" : 20, "min_order" : 1, "order_num" : 4, "max_order_price" : 11120, "user_name" : "Bob", "max_order" : 4 } { "_id" : ObjectId("5734bbf1172d290f844d2fdc"), "total_price" : 12230, "min_order_price" : 10, "min_order" : 1, "order_num" : 3, "max_order_price" : 11110, "user_name" : "Tom", "max_order" : 4 } |
- 最后只要将我们的项目提交到Storm上面去就好了
1 2 3 4 |
[root@storm_2 wordcount]# pwd /u01/storm_project/wordcount [root@storm_2 wordcount]# [root@storm_2 wordcount]# sparse submit |
到这里我们就使用Python完成了Storm环境的搭建和开发。
22.4. 总结
其实许多的系统中都不纯属于的OLTP或者OLAP,基本上是他们两个的结合体。当OLTP中掺杂OLAP的时候有时候如果单靠数据库查询来解决问题,这样就会造成OLTP系统变的慢(因为查询变大而复杂)。因此,遇到这类的情况就需要在架构层面上去解决了。现在,Storm和Spark都是用于实时计算。因此,有碰到类似以上场景的朋友,可以考虑给系统换上“新装”了。
昵称: HH
QQ: 275258836
ttlsa群交流沟通(QQ群②: 6690706 QQ群③: 168085569 QQ群④: 415230207(新) 微信公众号: ttlsacom)
感觉本文内容不错,读后有收获?
