1.1. 前言
由于我们这边没有使用 M/R join的方法来实现 IP对应的区域,我们是使用M/R结合Pandas来实现。
我们计算每日PV经过这四个步骤:文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
Mapper: 将以行数据解析成 key=real_ip value=1的形式文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
Shuffle: 通过Shuffle后的结果会生成以 key 的值排序的 value迭代器文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
结果如: real_ip [1, 1, 1 ... 1, 1]文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
Reduce 1: 在这边我们计算出 real_ip 的访问量文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
输出如: None [sum([1, 1, 1 ... 1, 1]), key]文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
Reduce 2:文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
- 初始化 area_ip pandas 数据
- 对sum([1, 1, 1 ... 1, 1]) 进行排序并输出 TOP 100
输入如: 31943 140.205.127.2 浙江省杭州市文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
1.2. 代码
cat mr_uv_real_ip_addr.py #!/usr/bin/env python #-*- coding:utf-8 -*- from mrjob.job import MRJob from mrjob.step import MRStep from mrjob.protocol import RawProtocol from ng_line_parser import NgLineParser import pandas as pd import heapq import socket import struct import sys reload(sys) sys.setdefaultencoding('utf-8') class MRUVRrealIpAddr(MRJob): OUTPUT_PROTOCOL = RawProtocol ng_line_parser = NgLineParser() def mapper(self, _, line): self.ng_line_parser.parse(line) yield self.ng_line_parser.real_ip, 1 def reducer_sum(self, key, values): """统计 VU""" yield None, [str(sum(values)), key] def init_ip_addr_df(self): """读取IP Addr 文件构造 DataFrame 文件""" cols = ['id', 'ip_start_num', 'ip_end_num', 'ip_start', 'ip_end', 'addr', 'operator'] area_ip_path = '/root/script/nginx_log_parse/area_ip.csv' self.ip_addr_df = pd.read_csv(area_ip_path, sep='\t', names=cols, index_col='id') def reducer_top100(self, _, values): """访问数降序""" for cnt, ip in heapq.nlargest(100, values, key=lambda x: int(x[0])): ip_num = -1 try: # 将IP转化成INT/LONG 数字 ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0]) # 通过数字获得 地址 DataFrame addr_df = self.ip_addr_df[(self.ip_addr_df.ip_start_num <= ip_num) & (ip_num <= self.ip_addr_df.ip_end_num)] # 通过索引值获得获得 地址 addr = addr_df.at[addr_df.index.tolist()[0], 'addr'] yield cnt, '{ip} {addr}'.format(ip=ip, addr=addr) except: yield cnt, ip def steps(self): return [ MRStep(mapper=self.mapper, reducer=self.reducer_sum), MRStep(reducer_init = self.init_ip_addr_df, reducer=self.reducer_top100) ] def main(): MRUVRrealIpAddr.run() if __name__ == '__main__': main()
运行统计和输出结果文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
python mr_uv_real_ip_addr.py < www.ttmark.com.access.log No configs found; falling back on auto-configuration Creating temp directory /tmp/mr_uv_cdn_ip_addr.root.20160925.023837.331013 Running step 1 of 2... reading from STDIN Running step 2 of 2... Streaming final output from /tmp/mr_uv_cdn_ip_addr.root.20160925.023837.331013/output... 31943 140.205.127.2 浙江省杭州市 26306 101.200.101.203 24667 101.200.101.214 ...... 4065 140.205.253.155 浙江省杭州市 4048 140.205.253.174 浙江省杭州市 3972 140.205.253.131 浙江省杭州市 Removing temp directory /tmp/mr_uv_cdn_ip_addr.root.20160925.023837.331013...
昵称: HH文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
QQ: 275258836文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
ttlsa群交流沟通(QQ群②: 6690706 QQ群③: 168085569 QQ群④: 415230207(新) 微信公众号: ttlsacom)文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
感觉本文内容不错,读后有收获?文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/
逛逛衣服店,鼓励作者写出更好文章。文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/ 文章源自运维生存时间-https://www.ttlsa.com/python/python-big-data-real-ip-addr-mrjob/

评论