MySQL应用架构优化-实时数据处理(2)

HH MySQL pythonMySQL应用架构优化-实时数据处理(2)已关闭评论10,23010字数 17240阅读57分28秒阅读模式

22.4.7. 初始化Storm

这边我们在(10.10.10.21 storm_1、10.10.10.22 storm_2、10.10.10.23 storm_3)这三台部署storm。

  • 到官网下载Stormtorm(apache-storm-0.9.6.zip)
  • 解压到/usr/local/目录下,三台机子都执行同样的命令
[root@storm_1 wordcount]# unzip apache-storm-0.9.6.zip
[root@storm_1 wordcount]# mv apache-storm-0.9.6 /usr/local/
  • 设置yaml配置文件
[root@storm_2 wordcount]# cat /usr/local/apache-storm-0.9.6/conf/storm.yaml 
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
########### These MUST be filled in for a storm configuration
# storm.zookeeper.servers:
#     - "server1"
#     - "server2"
# 
# nimbus.host: "nimbus"
# 
# 
# ##### These may optionally be filled in:
#    
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
#     - "server1"
#     - "server2"
 
## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metric.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
 
storm.zookeeper.servers:
    - "storm_1"
    - "storm_2"
    - "storm_3"
nimbus.host: "storm_1"
storm.local.dir: "/u01/storm/status"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
  • 创建Storm运行时目录
[root@storm_1 wordcount]# mkdir -p /u01/storm/status
  • 启动Storm
# Node1:启动 storm UI界面
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm ui > /dev/null 2>&1 &
 
# Node1:启动 storm nimbus
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm nimbus > /dev/null 2>&1 &
 
# Node2:启动 supervisor
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 &
 
# Node3:启动 supervisor
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 &
 
# 在各个节点上运行 jps 查看服务状态
[root@storm_1 wordcount]# lps
2151 core
2097 QuorumPeerMain
3969 Jps
2191 nimbus
  • 开启web界面访问Storm UI

mysql文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

看到上的界面就说明我们的Storm已经部署完毕了。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

22.4.1. 构建streamparse(Python Storm框架)

streamparse 是Python Storm的一个框架,他可以将python代码打包为一个jar包运行在Storm中。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

官网:http://streamparse.readthedocs.io/en/master/quickstart.html文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

(PS:streamparse 3 以上的拓扑已经改变。和作者沟通过他是为了让streamparse能够更好的独立运行,从而脱离storm环境。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

  • 创建3机信任,分别在3台机子上都生成ssh的公钥,分别执行以下命令
[root@storm_1 ~]# ssh-keygen -t rsa
Generating public/private rsa key pair.
 
Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
1e:20:62:da:f5:fb:69:32:da:ac:09:ef:7c:35:a5:01 root@storm_3
The key's randomart image is:
+--[ RSA 2048]----+
|                 |
|      E          |
|  o o ..         |
| + o o .. .      |
|. .   . S+       |
|       o+.       |
|  .   ....       |
|   + ++...       |
|   .B+o+o        |
+-----------------+

执行完上面命令后会在各个主机的 ~/.ssh/ 目录下会生成 id_rsa.pub 文件。将3台机子中的公钥都拷贝到一个文件中并且让3台机子的这个文件内容都一样文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

# storm_1 节点
[root@storm_1 ~]# cat ~/.ssh/id_rsa.pub 
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz+d5/LqyT6qDi1T8p+k4UHCkgmRqWZbG+LEtzQEjE3/Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI/C/Yu+q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH+h6Cb4UJChUR7owWKr root@storm_1
 
# storm_2 节点
[root@storm_2 ~]# cat ~/.ssh/id_rsa.pub
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC/n9bY6jD8Z2mkgZLO9meXAMNvDt/YJRpcTM57ko2p9Cmm4c+CgQzstBExOAciQR9ckranLj8k/GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi+xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl/GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU/Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO/uSJc0t root@storm_2
 
# storm_3 节点
[root@storm_3 ~]# cat ~/.ssh/id_rsa.pub
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA+a+UuT6DtVVIIl9/gPrNlRUFLy+8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS+TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ/oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy/jgpDdEQBibRucrTiQWoJ/uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw+kKEDlEZGCUuvFHLfIzV7C4PExEViml root@storm_3
 
# 每个节点中的 authorized_keys 文件内容
[root@storm_1 ~]# cat ~/.ssh/authorized_keys 
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz+d5/LqyT6qDi1T8p+k4UHCkgmRqWZbG+LEtzQEjE3/Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI/C/Yu+q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH+h6Cb4UJChUR7owWKr root@storm_1
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC/n9bY6jD8Z2mkgZLO9meXAMNvDt/YJRpcTM57ko2p9Cmm4c+CgQzstBExOAciQR9ckranLj8k/GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi+xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl/GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU/Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO/uSJc0t root@storm_2
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA+a+UuT6DtVVIIl9/gPrNlRUFLy+8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS+TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ/oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy/jgpDdEQBibRucrTiQWoJ/uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw+kKEDlEZGCUuvFHLfIzV7C4PExEViml root@storm_3
  • 在3台机子上创建config文件(3台机子都要执行)
[root@storm_1 wordcount]# touch /root/.ssh/config
  • 下载 lein 文件到 /usr/local/bin 目录中,授予可执行权限(3台机子都要执行)
[root@storm_1 wordcount]# wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein
[root@storm_1 wordcount]# mv lein /usr/local/bin/
[root@storm_1 wordcount]# chmod 755 /usr/local/bin/lein
  • 安装streamparse(3台机子都要执行)
[root@storm_1 wordcount]# pip install streamparse
  • 创建storm_project 目录,并且开始一个简单的Storm项目(在storm_2上操作),这边不要再Storm启动的Nimbus节点上创建,因为到时候运行Storm项目会有端口上的冲突。
[root@storm_2 ~]# mkdir -p /u01/storm_project
[root@storm_2 ~]# cd /u01/storm_project/
[root@storm_2 storm_project]# pwd
/u01/storm_project
 
[root@storm_2 ~]# sparse quickstart wordcount
Creating your wordcount streamparse project...
    create    wordcount
    create    wordcount/.gitignore
    create    wordcount/config.json
    create    wordcount/fabfile.py
    create    wordcount/project.clj
    create    wordcount/README.md
    create    wordcount/src
    create    wordcount/src/bolts/
    create    wordcount/src/bolts/__init__.py
    create    wordcount/src/bolts/wordcount.py
    create    wordcount/src/spouts/
    create    wordcount/src/spouts/__init__.py
    create    wordcount/src/spouts/words.py
    create    wordcount/topologies
    create    wordcount/topologies/wordcount.py
    create    wordcount/virtualenvs
    create    wordcount/virtualenvs/wordcount.txt
Done.
 
Try running your topology locally with:
 
    cd wordcount
    sparse run
  • 设置json配置文件(在storm_2上操作)
[root@storm_2 wordcount]# cat /u01/storm_project/wordcount/config.json 
{
    "library": "",
    "topology_specs": "topologies/",
    "virtualenv_specs": "virtualenvs/",
    "envs": {
        "prod": {
            "user": "root",
            "nimbus": "storm_1",
            "workers": [
                "storm_1",
                "storm_2",
                "storm_3"
            ],
            "log": {
                "path": "/tmp/storm/stream/log",
                "file": "pystorm_{topolopy_name}_{component_name}_{task_id}_{pid}.log",
                "max_bytes": 1000000,
                "backup_count": 10,
                "level": "info"
            },
            "use_ssh_for_nimbus": true,
            "virtualenv_root": "/tmp/storm/stream/virtualenvs"
        }
    }
}
  • 创建相关目录(3个机器上都需要执行)
[root@storm_1 wordcount]# mkdir -p /tmp/storm/stream/log 
[root@storm_1 wordcount]# mkdir -p /tmp/storm/stream/virtualenvs
  • 将wordcount程序提交到Storm集群上(在storm_2上操作)
[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]# sparse submit
[storm_1] Executing task '_create_or_update_virtualenv'
[storm_2] Executing task '_create_or_update_virtualenv'
 
... omit ...
 
[storm_1] run: rm /tmp/streamparse_requirements-oD8qdm4We.txt
[storm_3] out: 
 
[storm_3] run: rm /tmp/streamparse_requirements-5greXfqjW.txt
Cleaning from prior builds...
# 需要敲回车键
 
Creating topology Uber-JAR...
# 需要敲回车键
 
Uber-JAR created: /u01/storm_project/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar
Deploying "wordcount" topology...
ssh tunnel to Nimbus storm_1:6627 established.
Routing Python logging to /tmp/storm/stream/log.
Running lein command to submit topology to nimbus:
lein run -m streamparse.commands.submit_topology/-main topologies/wordcount.clj --option 'topology.workers=2' --option 'topology.acker.executors=2' --option 'topology.python.path="/tmp/storm/stream/virtualenvs/wordcount/bin/python"' --option 'streamparse.log.path="/tmp/storm/stream/log"' --option 'streamparse.log.max_bytes=1000000' --option 'streamparse.log.backup_count=10' --option 'streamparse.log.level="info"'
WARNING: You're currently running as root; probably by accident.
Press control-C to abort or Enter to continue as root.
Set LEIN_ROOT to disable this warning.
# 需要敲回车键
 
 
{:option {streamparse.log.level info, streamparse.log.backup_count 10, streamparse.log.max_bytes 1000000, streamparse.log.path /tmp/storm/stream/log, topology.python.path /tmp/storm/stream/virtualenvs/wordcount/bin/python, topology.acker.executors 2, topology.workers 2}, :debug false, :port 6627, :host localhost, :help false}
1604 [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
1620 [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar /u01/storm_project/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar to assigned location: /u01/storm/status/nimbus/inbox/stormjar-03200d7a-dec1-44a6-b0f7-e775d0227864.jar
3853 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /u01/storm/status/nimbus/inbox/stormjar-03200d7a-dec1-44a6-b0f7-e775d0227864.jar
3854 [main] INFO  backtype.storm.StormSubmitter - Submitting topology wordcount in distributed mode with conf {"streamparse.log.backup_count":10,"streamparse.log.path":"\/tmp\/storm\/stream\/log","topology.python.path":"\/tmp\/storm\/stream\/virtualenvs\/wordcount\/bin\/python","topology.debug":false,"nimbus.thrift.port":6627,"topology.max.spout.pending":5000,"nimbus.host":"localhost","topology.workers":2,"topology.acker.executors":2,"streamparse.log.max_bytes":1000000,"streamparse.log.level":"info","topology.message.timeout.secs":60}
4487 [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: wordcount

如果输出类似上面的信息就算是部署完成了。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

  • 确认wordcount程序已经部署到了 Storm中

mysql文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

  • 停止Storm中的wordcount程序
[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]# sparse kill -n wordcount
WARNING: You're currently running as root; probably by accident.
Press control-C to abort or Enter to continue as root.
Set LEIN_ROOT to disable this warning.
 
5180 [main] INFO  backtype.storm.thrift - Connecting to Nimbus at localhost:6627
Killed topology:  wordcount

出现上面信息就说明wordcount程序已经从Storm集群中停止并移除了。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

mysql文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

22.4.9. streamparse代码编写

由于这是示例程序,我们就在之前创建好的wordcount项目中修改代码。在这里我们只需要修改spout和bolt的代码就好。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

这边我们需要安装Python Kafka和Python MongoDB的相关模块,执行如下命令:文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

# 在操作系统自带的Python中安装,主要是为了使用sparse run时会调用
pip install pykafka
pip install pymongo
# 在streamparse Storm Python虚拟环境中安装(sparse submit)
/tmp/storm/stream/virtualenvs/wordcount/bin/pip install pykafka
/tmp/storm/stream/virtualenvs/wordcount/bin/pip install pymongo
  • words.py代码(spout)

words.py的功能就是不断消费kafka产生的消息,并且发送(emit)下面一个接收者(spout|bolt)。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

[root@storm_2 spouts]# pwd
/u01/storm_project/wordcount/src/spouts
[root@storm_2 spouts]# 
[root@storm_2 spouts]# 
[root@storm_2 spouts]# cat words.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
 
import itertools
from streamparse.spout import Spout
from pykafka import KafkaClient
import simplejson as json
import sys
  
reload(sys)
sys.setdefaultencoding('utf-8')
 
class WordSpout(Spout):
 
    def initialize(self, stormconf, context):
        # self.words = itertools.cycle(['dog', 'cat',
        #                               'zebra', 'elephant'])
        client = KafkaClient(hosts="10.10.10.11:9092")
        topic = client.topics[b"test"]
        self.balanced_consumer = topic.get_balanced_consumer(
            consumer_group=b"test_group",
            auto_commit_enable=True,
            zookeeper_connect="storm_1:2181,storm_2:2181,storm_3:2181"
        )
 
    def next_tuple(self):
        # word = next(self.words)
        # self.emit([word])
        message = self.balanced_consumer.consume()
        # Logstash字符串转化为dict
        log_info = json.loads(message.value)
        word = log_info["message"]
        with open("/tmp/storm.log", "a") as f:
            f.write(word)
        self.emit([word])
  • py代码

wordcount.py主要是实现了,接收从words.py发送的信息(json字符串),并将接收的到信息解析成转化成python的字典类型,分析数据存放到MongoDB(10.10.10.12)中。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

[root@storm_2 bolts]# pwd
/u01/storm_project/wordcount/src/bolts
[root@storm_2 bolts]# 
[root@storm_2 bolts]# cat wordcount.py
# -*- coding:utf-8 -*-
 
from __future__ import absolute_import, print_function, unicode_literals
 
from collections import Counter
from streamparse.bolt import Bolt
import simplejson as json
from pymongo import MongoClient
import sys
  
reload(sys)
sys.setdefaultencoding('utf-8')
 
 
class WordCounter(Bolt):
 
    def initialize(self, conf, ctx):
        # self.counts = Counter()
 
        client = MongoClient(b"10.10.10.12:27017,10.10.10.12:27018,10.10.10.12:27019",
                             replicaset="rs_12")
        # 获得 order_stat 数据库
        self.db = client.shop
         
 
    def process(self, tup):
        # 获得从spout传过来的字符串
        word = tup.values[0]
        # self.counts[word] += 1
        # self.emit([word, self.counts[word]])
        # self.log('%s: %d' % (word, self.counts[word]))
 
        # 将spout传来的字符串解析成dict
        order_info = json.loads(word)
 
        # 通过 kafka 传入的 user_name 查找相关用户统计信息
        condition = {"user_name": order_info["user_name"]}
        order_stat_info = self.db.order_stat.find_one(condition)
         
        ## 如果order_stat_info无值则插入, 有值则更新
        # 1、无值情况
        if not order_stat_info:
            order_stat_info_new = {
                "user_name": order_info.get("user_name", "Unknow"),
                "order_num": 1,
                "total_price": order_info.get("price", 0.00),
                "min_order_price": order_info.get("price", 0.00),
                "max_order_price": order_info.get("price", 0.00),
                "min_order": order_info.get("order_id", 0),
                "max_order": order_info.get("order_id", 0),
            }
            self.db.order_stat.insert_one(order_stat_info_new)
        # 2、有值情况
        else:
            min_order_price = min(order_stat_info["min_order_price"],
                                  order_info.get("price", 0.00))
            max_order_price = max(order_stat_info["max_order_price"],
                                  order_info.get("price", 0.00))
            min_order = order_stat_info["min_order"]
            max_order = order_stat_info["max_order"]
             
            # 设置 最小order id
            if min_order_price == order_info.get("price", 0.00):
                min_order = order_info.get("order_id", min_order)
            # 设置 最大order id
            if max_order_price == order_info.get("price", 0.00):
                max_order = order_info.get("order_id", max_order)
            # 构造更新的信息
            order_stat_info_new = {
                "order_num": order_stat_info["order_num"] + 1,
                "total_price": order_stat_info["total_price"] + 
                               order_info.get("price", 0.00),
                "min_order_price": min_order_price,
                "max_order_price": max_order_price,
                "min_order": min_order,
                "max_order": max_order
            }
            # 跟新信息
            self.db.order_stat.update_one({"_id": order_stat_info["_id"]},
                                          {"$set": order_stat_info_new})

编写好上面代码之后就需要测试运行情况了。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

  • 运行streamparse进行测试

由于我们还不知道我们写的代码正确性,因此需要使用sparse run来记性调试,而非使用sparse submit直接提交到Storm环境中。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]# 
[root@storm_2 wordcount]# sparse run
... Omit ...
8653 [Thread-15-count-bolt] INFO  backtype.storm.task.ShellBolt - Launched subprocess with pid 3719
8703 [Thread-16-word-spout] INFO  backtype.storm.spout.ShellSpout - Launched subprocess with pid 3717
8706 [Thread-13-count-bolt] INFO  backtype.storm.task.ShellBolt - Start checking heartbeat...
8706 [Thread-13-count-bolt] INFO  backtype.storm.daemon.executor - Prepared bolt count-bolt:(3)
8708 [Thread-15-count-bolt] INFO  backtype.storm.task.ShellBolt - Start checking heartbeat...
8708 [Thread-15-count-bolt] INFO  backtype.storm.daemon.executor - Prepared bolt count-bolt:(4)
8708 [Thread-16-word-spout] INFO  backtype.storm.daemon.executor - Opened spout word-spout:(5)
8715 [Thread-16-word-spout] INFO  backtype.storm.daemon.executor - Activating spout word-spout:(5)
8715 [Thread-16-word-spout] INFO  backtype.storm.spout.ShellSpout - Start checking heartbeat...
  • 向Logstash(10.10.11)监听的文件中输入相关的订单信息
echo '{"order_id":1, "price":20, "user_name":"Bob", "goods_name":"good_name2"}' > /tmp/orders.log
echo '{"order_id":2, "price":120, "user_name":"Bob", "goods_name":"good_name1"}' >> /tmp/orders.log
echo '{"order_id":3, "price":1120, "user_name":"Bob", "goods_name":"good_name4"}' >> /tmp/orders.log
echo '{"order_id":4, "price":11120, "user_name":"Bob", "goods_name":"good_name3"}' >> /tmp/orders.log
 
echo '{"order_id":1, "price":10, "user_name":"Tom", "goods_name":"good_name2"}' >> /tmp/orders.log
echo '{"order_id":2, "price":110, "user_name":"Tom", "goods_name":"good_name1"}' >> /tmp/orders.log
echo '{"order_id":3, "price":1110, "user_name":"Tom", "goods_name":"good_name4"}' >> /tmp/orders.log
echo '{"order_id":4, "price":11110, "user_name":"Tom", "goods_name":"good_name3"}' >> /tmp/orders.log
  • 查看MongoDB(10.10.12)中的订单统计信息
[root@normal_12 ~]# /u01/mongodb_27018/client_mongodb.sh 
MongoDB shell version: 3.2.5
connecting to: 10.10.10.12:27018/test
(test) 01:01:10>
(test) 01:01:11> use shop
switched to db shop
(shop) 01:01:16>
(shop) 01:22:32>db.order_stat.find()
{ 
    "_id" : ObjectId("5734bba0172d290f86e2d2e4"), 
    "total_price" : 12380, 
    "min_order_price" : 20, 
    "min_order" : 1, 
    "order_num" : 4, 
    "max_order_price" : 11120, 
    "user_name" : "Bob", 
    "max_order" : 4 
}
{ 
    "_id" : ObjectId("5734bbf1172d290f844d2fdc"), 
    "total_price" : 12230, 
    "min_order_price" : 10, 
    "min_order" : 1, 
    "order_num" : 3, 
    "max_order_price" : 11110, 
    "user_name" : "Tom", 
    "max_order" : 4 
}
  • 最后只要将我们的项目提交到Storm上面去就好了
[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]# 
[root@storm_2 wordcount]# sparse submit

到这里我们就使用Python完成了Storm环境的搭建和开发。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

22.4. 总结

其实许多的系统中都不纯属于的OLTP或者OLAP,基本上是他们两个的结合体。当OLTP中掺杂OLAP的时候有时候如果单靠数据库查询来解决问题,这样就会造成OLTP系统变的慢(因为查询变大而复杂)。因此,遇到这类的情况就需要在架构层面上去解决了。现在,Storm和Spark都是用于实时计算。因此,有碰到类似以上场景的朋友,可以考虑给系统换上“新装”了。文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

 文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

昵称: HH
QQ: 275258836
ttlsa群交流沟通(QQ群②: 6690706 QQ群③: 168085569 QQ群④: 415230207(新) 微信公众号: ttlsacom)文章源自运维生存时间-https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/

感觉本文内容不错,读后有收获?

逛逛衣服店,鼓励作者写出更好文章。

weinxin
我的微信
微信公众号
扫一扫关注运维生存时间公众号,获取最新技术文章~
HH
  • 本文由 发表于 07/09/2016 02:19:39
  • 转载请务必保留本文链接:https://www.ttlsa.com/mysql/mysql-application-architecture-performance-2/