- A+
所属分类:OpenTSDB
OpenTSDB 的内容介绍可以参见《OpenTSDB监控系统的研究和介绍》。 tcollector 就是用来收集数据的一系列脚本。源码位于:https://github.com/OpenTSDB/tcollector
可以直接运行将数据直接显示出来:
1 |
# python tcollector.py --dry-run -t project=monitor -t host=www.ttlsa.com |
tcollector.py 源码分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 |
#!/usr/bin/python #coding:utf-8 import atexit import errno import fcntl import logging import os import random import re import signal import socket import subprocess import sys import threading import time from logging.handlers import RotatingFileHandler from Queue import Queue from Queue import Empty from Queue import Full from optparse import OptionParser # global variables. COLLECTORS = {} GENERATION = 0 DEFAULT_LOG = '/var/log/tcollector.log' LOG = logging.getLogger('tcollector') ALIVE = True MAX_UNCAUGHT_EXCEPTIONS = 100 def register_collector(collector): """ 把对象collector放到全局字典COLLECTORS中,键为collector.name,值为collector,留日后使用保存的这些对象 :param collector: :return: """ #检查collector实例是否是Collector类的对象,如果不是,抛出异常错误 assert isinstance(collector, Collector), "collector=%r" % (collector,) #如果对象已经在全局字典中存在,通过col.proc属性查看这个对象属否已经用来收集数据,如果已经使用,把它shutdown掉 if collector.name in COLLECTORS: col = COLLECTORS[collector.name] if col.proc is not None: LOG.error('%s still has a process (pid=%d) and is being reset,terminating', col.name, col.proc.pid) col.shutdown() #为全局字典赋值 COLLECTORS[collector.name] = collector class ReaderQueue(Queue): """ 这个类继承自Queue,可以把数据通过该类的nput方法将数据放入队列中 """ def nput(self, value): """ 实现非堵塞的put,目的是如果队列已经满了不要等待直接抛出Queue.Full异常,否则put一直会等到队列空出位置为止 :param value:放入队列的数据 :return:如果插入队列成功返回True,否则False """ try: #False实现非堵塞 self.put(value, False) except Full: LOG.error("DROPPED LINE: %s", value) return False return True class Collector(object): """ 收集器类,负责管理进程和从进程中获取数据 """ def __init__(self, colname, interval, filename, mtime=0, lastspawn=0): """Construct a new Collector.""" self.name = colname self.interval = interval self.filename = filename self.lastspawn = lastspawn #self.proc会在使用的时候保存一个subprocess.Popen对象 self.proc = None self.nextkill = 0 self.killstate = 0 self.dead = False self.mtime = mtime self.generation = GENERATION #来用保存临时数据的 self.buffer = "" #从self.buffer中以"\n"分隔数据放到自己的囊中 self.datalines = [] self.values = {} self.lines_sent = 0 self.lines_received = 0 self.lines_invalid = 0 def read(self): """ 读取subprocess.Popen对象执行的时候打印的结果:对于错误的结果,打印到日志,不做处理;只收集标准输出数据 :return: """ #读取self.proc这个进程的执行时候产生的错误输出,如果有,打印到日志中 try: #读取self.proc进程的错误输出 out = self.proc.stderr.read() #如果有错误输出,打印到日志,没干别的了 if out: LOG.debug('reading %s got %d bytes on stderr',self.name, len(out)) for line in out.splitlines(): LOG.warning('%s: %s', self.name, line) except IOError, (err, msg): if err != errno.EAGAIN: raise except: LOG.exception('uncaught exception in stderr read') #获取self.proc进程的标准输出数据,追加到self.buffer变量中 try: self.buffer += self.proc.stdout.read() if len(self.buffer): LOG.debug('reading %s, buffer now %d bytes',self.name, len(self.buffer)) except IOError, (err, msg): if err != errno.EAGAIN: raise except: LOG.exception('uncaught exception in stdout read') return #把self.buffer拆分为行,把这些行数据放入到self.datalines这个list变量中保存 while self.buffer: idx = self.buffer.find('\n') #如果没有换行符,退出循环 if idx == -1: break #一行一行的获取,代码看起来不pythonic line = self.buffer[0:idx].strip() if line: self.datalines.append(line) #每循环一次修改self.buffer变量 self.buffer = self.buffer[idx+1:] def collect(self): """ 把read方法放入到self.datalines中的数据读取出来,yield给调用者,即调用者可以通过for来遍历方法collect的结果 :return: """ while self.proc is not None: self.read() if not len(self.datalines): return while len(self.datalines): yield self.datalines.pop(0) def shutdown(self): """ kill掉self.proc进程 :return: """ if not self.proc: return try: #self.proc.poll执行的时候如果self.proc进程没有完成,那么poll就返回None,否则返回进程的exit code if self.proc.poll() is None: #进程没有完成,直接kill掉,默认发的是signal.SIGTERM信号给进程 kill(self.proc) for attempt in range(5): if self.proc.poll() is not None: return LOG.info('Waiting %ds for PID %d to exit...' % (5 - attempt, self.proc.pid)) time.sleep(1) #5次绅士般的kill不成功,那么不得不来点硬的了,发送signal.SIGKILL信号,由操作系统直接关闭,啥都不解释 kill(self.proc, signal.SIGKILL) self.proc.wait() except: LOG.exception('ignoring uncaught exception while shutting down') def evict_old_keys(self, cut_off): """ 把时间戳小cut_off的数据删除掉 :param cut_off: 时间戳 :return: """ for key in self.values.keys(): time = self.values[key][3] if time < cut_off: del self.values[key] class StdinCollector(Collector): """ 从标准输入获取数据的收集器,比较简单,一些self.proc就不需要指定到摸个进程了 """ def __init__(self): #调用父类的__init__方法注意传递的这三个变量 super(StdinCollector, self).__init__('stdin', 0, '<stdin>') #因为从标准输出获取数据,所以self.proc用不到,直接给他个True,给其他也可以,这个不重要的 self.proc = True def read(self): """ 从标准输出获取数据,sys.stdin.readline会block直到有数据进来 :return: """ global ALIVE line = sys.stdin.readline() if line: self.datalines.append(line.rstrip()) else: ALIVE = False def shutdown(self): #因为self.proc没有对应的进程了,所以这个方法没有意义了,直接覆盖掉父类的方法 pass class ReaderThread(threading.Thread): def __init__(self, dedupinterval, evictinterval): """ 线程,用来从collector中获取数据,然后放到ReaderQueue中 :param dedupinterval:设置多少秒内的相同数据不发送 :param evictinterval:设置多少秒以前而且没有发送的数据,把他删除 :return: """ #保证evictinterval必须大于dedupinterval,否则不发送的数据很快被删除 assert evictinterval > dedupinterval, "%r <= %r" % (evictinterval,dedupinterval) #调用threading.Thread的初始化函数__init_ super(ReaderThread, self).__init__() #建立可以存放10k个数据的队列 self.readerq = ReaderQueue(100000) self.lines_collected = 0 self.lines_dropped = 0 self.dedupinterval = dedupinterval self.evictinterval = evictinterval def run(self): LOG.debug("ReaderThread up and running") lastevict_time = 0 while ALIVE: for col in all_living_collectors(): for line in col.collect(): #处理从collector中获取到的数据,并传入self.readerq队列中存储 self.process_line(col, line) #删除一定时间之前的数据,那些数据当时发送不了了,现在发送也没有用了,暂时这么理解 now = int(time.time()) if now - lastevict_time > self.evictinterval: lastevict_time = now now -= self.evictinterval for col in all_collectors(): col.evict_old_keys(now) #循环每次到这里暂停1s time.sleep(1) def process_line(self, col, line): """ 处理从collector中获取到的数据 :param col: :param line: :return: """ #获取到数据以后修改col对象中的lines_received加1,标识此对象又发送了一行数据 col.lines_received += 1 #数据长度限制,这个是由于opentsdb.tsd.PipelineFactory限制了 if len(line) >= 1024: LOG.warning('%s line too long: %s', col.name, line) #修改此对象中的lines_invalid,标识对象的行数据又失效了一行了 col.lines_invalid += 1 return #行数据的格式必须是:<监控指标名称> <时间戳> <数据量> [标签1,标签2...] #其中标签是任何的k,v对,如a=b等 #例如:mccq.cpu.average 1361006731 15 host=mc_s1_192.168.1.1 project=m1 parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name. '(\d+)\s+' # Timestamp. '(\S+?)' # Value (int or float). '((?:\s+[-_./a-zA-Z0-9]+=[-_./a-zA-Z0-9]+)*)$', # Tags line) #如果不匹配数据,则打印错误日志,并修改col对象的lines_invalid加1 if parsed is None: LOG.warning('%s sent invalid data: %s', col.name, line) col.lines_invalid += 1 return metric, timestamp, value, tags = parsed.groups() timestamp = int(timestamp) key = (metric, tags) if key in col.values: #如果timestamp不大于前一个数据,忽略掉这行数据 if timestamp <= col.values[key][3]: LOG.error("Timestamp out of order: metric=%s%s," " old_ts=%d >= new_ts=%d - ignoring data point" " (value=%r, collector=%s)", metric, tags, col.values[key][3], timestamp, value, col.name) col.lines_invalid += 1 return #如果value与前一样相同,而timestamp与前一行的数据的timestamp差小于self.dedupinterval,不发送此数据,标识为True就行 if (col.values[key][0] == value and (timestamp - col.values[key][3] < self.dedupinterval)): col.values[key] = (value, True, line, col.values[key][3]) return #为了优化opentsdb的图形连续性,我们要把之前因为在规定的时间间隔内value相同的而保留下来的数据先发送了 if ((col.values[key][1] or (timestamp - col.values[key][3] >= self.dedupinterval)) and col.values[key][0] != value): #col对象发送的数据量+1 col.lines_sent += 1 if not self.readerq.nput(col.values[key][2]): #发送了数据,但是不成功,self.lines_dropped变量值+1 self.lines_dropped += 1 else: col.values[key][1] = False #将发送的数据放入到col对象的values字典中,同时发送这行数据 col.values[key] = (value, False, line, timestamp) col.lines_sent += 1 if not self.readerq.nput(line): self.lines_dropped += 1 class SenderThread(threading.Thread): def __init__(self, reader, dryrun, host, port, self_report_stats, tags): """ 将数据发送给tsd,该工具没有对失败的数据进行持久化,这样程序崩溃以后,已经获取但是还没有发送的数据就会丢失了 :param reader:对ReaderTread实例的引用 :param dryrun:是否是测试模式,如果是只会将数据打印到终端,而不发送给tsd数据库 :param host:tsd地址 :param port:tsd端口 :param self_report_stats:是否也把tcollector自己的数据的统计发给tsd :param tags:额外的标签 :return: """ super(SenderThread, self).__init__() self.dryrun = dryrun self.host = host self.port = port self.reader = reader self.tagstr = tags self.tsd = None self.last_verify = 0 self.sendq = [] self.self_report_stats = self_report_stats def run(self): """ 循环发送数据给tsd,第一次获取,超时时间为5s,如果得到数据就再等5s,尝试再拿一行数据,然后把得到的数据全部发送出去 """ errors = 0 while ALIVE: try: #检查到tsd的连接是否已经起来 self.maintain_conn() try: #从reader的队列中获取监控数据,堵塞,超时5s,如果队列为空,抛出Queue.Empty异常 line = self.reader.readerq.get(True, 5) except Empty: continue self.sendq.append(line) #暂停5s time.sleep(5) while True: try: #再从reader中获取监控数据,非堵塞,如果没有数据直接抛出异常 line = self.reader.readerq.get(False) except Empty: break self.sendq.append(line) #发送self.sendq中保存的数据 self.send_data() errors = 0 except (ArithmeticError, EOFError, EnvironmentError, LookupError, ValueError), e: errors += 1 #异常发生次数超过MAX_UNCAUGHT_EXCEPTIONS,就退出 if errors > MAX_UNCAUGHT_EXCEPTIONS: shutdown() raise LOG.exception('Uncaught exception in SenderThread, ignoring') time.sleep(1) continue except: LOG.exception('Uncaught exception in SenderThread, going to exit') #遇到未知错误也退出 shutdown() raise def verify_conn(self): """ 定期检查到tsd的连接,同时检查tsb是否正常工作 """ if self.tsd is None: return False #上次检查离现在少于60s的不在检查连接状况,直接返回True if self.last_verify > time.time() - 60: return True LOG.debug('verifying our TSD connection is alive') try: #通过发送"version\n"命令查看tsd连接状态 self.tsd.sendall('version\n') except socket.error, msg: self.tsd = None return False bufsize = 4096 while ALIVE: #尽可能多的读取tsb返回的数据,这个recv方法是会堵塞的,但是针对conn已经做了超时了 try: buf = self.tsd.recv(bufsize) except socket.error, msg: self.tsd = None return False #如果没有获取到`version`命令返回的数据,说明tsd已经dead if not buf: self.tsd = None return False #获取一次还不过瘾,如果这次获取的数据长度是bufsize,continue,再获取,直到获取完为止,太贪婪了 if len(buf) == bufsize: continue #如果要发送self_report_stats数据,先发送一些collector的搜集结果 if self.self_report_stats: #先发送reader搜集的结果统计数据 strs = [ ('reader.lines_collected', '', self.reader.lines_collected), ('reader.lines_dropped', '', self.reader.lines_dropped) ] #每个collector数据收集和发送的统计数据 for col in all_living_collectors(): strs.append(('collector.lines_sent', 'collector=' + col.name, col.lines_sent)) strs.append(('collector.lines_received', 'collector=' + col.name, col.lines_received)) strs.append(('collector.lines_invalid', 'collector=' + col.name, col.lines_invalid)) ts = int(time.time()) #每个元素的数据格式示例:tcollector.collector.lines_sent 1361029499 80 collector=system.io.aver strout = ["tcollector.%s %d %d %s" % (x[0], ts, x[2], x[1]) for x in strs] for string in strout: self.sendq.append(string) #退出循环 break #程序执行到这里,可以任务连接没有问题,更新self.last_verify的值 self.last_verify = time.time() return True def maintain_conn(self): """ 管理collector到tsd的连接 :return: """ #如果是dryrun模式,不用继续检查 if self.dryrun: return #检验连接状况,确保连接正常 try_delay = 1 while ALIVE: if self.verify_conn(): return #延迟设计,最多延迟10分钟,延迟慢慢增加 try_delay *= 1 + random.random() if try_delay > 600: try_delay *= 0.5 LOG.debug('SenderThread blocking %0.2f seconds', try_delay) time.sleep(try_delay) #根据给定的参数host/port,相应的转换成一个包含用于创建socket对象的五元组, #该函数返回一个五元组,同时第五个参数sockaddr也是一个二元组(address, port) adresses = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC,socket.SOCK_STREAM, 0) for family, socktype, proto, canonname, sockaddr in adresses: try: #创建socket对象 self.tsd = socket.socket(family, socktype, proto) #设置超时为15秒 self.tsd.settimeout(15) #连接到tsd服务 self.tsd.connect(sockaddr) #程序执行到这里,说明连接成功,退出循环 break except socket.error, msg: LOG.warning('Connection attempt failed to %s:%d: %s',self.host, self.port, msg) self.tsd.close() self.tsd = None if not self.tsd: LOG.error('Failed to connect to %s:%d', self.host, self.port) def send_data(self): """ 将self.sendq中保存的数据发到tsb """ #构造发送数据格式 out = '' for line in self.sendq: line = 'put ' + line + self.tagstr out += line + '\n' LOG.debug('SENDING: %s', line) if not out: LOG.debug('send_data no data?') return #发送数据,这里需要完善,否则失败的话信息就丢失了 try: if self.dryrun: print out else: self.tsd.sendall(out) self.sendq = [] except socket.error, msg: LOG.error('failed to send data: %s', msg) try: self.tsd.close() except socket.error: pass self.tsd = None def setup_logging(logfile=DEFAULT_LOG, max_bytes=None, backup_count=None): """ 设置日志参数 :param logfile:保存日志的文件 :param max_bytes: 日志文件最大值 :param backup_count:备份日志的数量 :return: """ #设置日志级别为INFO LOG.setLevel(logging.INFO) if backup_count is not None and max_bytes is not None: assert backup_count > 0 assert max_bytes > 0 ch = RotatingFileHandler(logfile, 'a', max_bytes, backup_count) else: ch = logging.StreamHandler(sys.stdout) #日志格式 ch.setFormatter(logging.Formatter('%(asctime)s %(name)s[%(process)d] ' '%(levelname)s: %(message)s')) LOG.addHandler(ch) def parse_cmdline(argv): """ 脚本执行选项设置 :param argv: :return: """ #获取默认程序路径 default_cdir = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), 'collectors') #获取相关参数 parser = OptionParser(description='Manages collectors which gather ' 'data and report back.') parser.add_option('-c', '--collector-dir', dest='cdir', metavar='DIR', default=default_cdir, help='Directory where the collectors are located.') parser.add_option('-d', '--dry-run', dest='dryrun', action='store_true', default=False, help='Don\'t actually send anything to the TSD, ' 'just print the datapoints.') parser.add_option('-D', '--daemonize', dest='daemonize', action='store_true', default=False, help='Run as a background daemon.') parser.add_option('-H', '--host', dest='host', default='localhost', metavar='HOST', help='Hostname to use to connect to the TSD.') parser.add_option('--no-tcollector-stats', dest='no_tcollector_stats', default=False, action='store_true', help='Prevent tcollector from reporting its own stats to TSD') parser.add_option('-s', '--stdin', dest='stdin', action='store_true', default=False, help='Run once, read and dedup data points from stdin.') parser.add_option('-p', '--port', dest='port', type='int', default=4242, metavar='PORT', help='Port to connect to the TSD instance on. ' 'default=%default') parser.add_option('-v', dest='verbose', action='store_true', default=False, help='Verbose mode (log debug messages).') parser.add_option('-t', '--tag', dest='tags', action='append', default=[], metavar='TAG', help='Tags to append to all timeseries we send, ' 'e.g.: -t TAG=VALUE -t TAG2=VALUE') parser.add_option('-P', '--pidfile', dest='pidfile', default='/var/run/tcollector.pid', metavar='FILE', help='Write our pidfile') parser.add_option('--dedup-interval', dest='dedupinterval', type='int', default=300, metavar='DEDUPINTERVAL', help='Number of seconds in which successive duplicate ' 'datapoints are suppressed before sending to the TSD. ' 'default=%default') parser.add_option('--evict-interval', dest='evictinterval', type='int', default=6000, metavar='EVICTINTERVAL', help='Number of seconds after which to remove cached ' 'values of old data points to save memory. ' 'default=%default') parser.add_option('--max-bytes', dest='max_bytes', type='int', default=64 * 1024 * 1024, help='Maximum bytes per a logfile.') parser.add_option('--backup-count', dest='backup_count', type='int', default=0, help='Maximum number of logfiles to backup.') parser.add_option('--logfile', dest='logfile', type='str', default=DEFAULT_LOG, help='Filename where logs are written to.') (options, args) = parser.parse_args(args=argv[1:]) if options.dedupinterval < 2: parser.error('--dedup-interval must be at least 2 seconds') if options.evictinterval <= options.dedupinterval: parser.error('--evict-interval must be strictly greater than ' '--dedup-interval') if options.daemonize and not options.backup_count: options.backup_count = 1 return (options, args) def daemonize(): """ python产生守护进程的典型代码,两次fork,两次exit :return: """ if os.fork(): os._exit(0) os.chdir("/") os.umask(022) os.setsid() os.umask(0) if os.fork(): os._exit(0) stdin = open(os.devnull) stdout = open(os.devnull, 'w') os.dup2(stdin.fileno(), 0) os.dup2(stdout.fileno(), 1) os.dup2(stdout.fileno(), 2) stdin.close() stdout.close() for fd in xrange(3, 1024): try: os.close(fd) except OSError: pass def main(argv): """ 主程序开始 :param argv: :return: """ #分析参数和选项 options, args = parse_cmdline(argv) #如果设置了守护进程选项,通过daemonize让程序进入守护进程模式 if options.daemonize: daemonize() #设置日志相关参数 setup_logging(options.logfile, options.max_bytes or None, options.backup_count or None) #设置可见模式,即debug模式 if options.verbose: LOG.setLevel(logging.DEBUG) #写pid文件 if options.pidfile: write_pid(options.pidfile) #检查tags格式是否有效 tags = {} for tag in options.tags: if re.match('^[-_.a-z0-9]+=\S+$', tag, re.IGNORECASE) is None: assert False, 'Tag string "%s" is invalid.' % tag k, v = tag.split('=', 1) if k in tags: assert False, 'Tag "%s" already declared.' % k tags[k] = v options.cdir = os.path.realpath(options.cdir) if not os.path.isdir(options.cdir): LOG.fatal('No such directory: %s', options.cdir) return 1 #加载模块 modules = load_etc_dir(options, tags) #设置host标签 if not 'host' in tags and not options.stdin: tags['host'] = socket.gethostname() LOG.warning('Tag "host" not specified, defaulting to %s.', tags['host']) #构造tags字符串,以空格分开 tagstr = '' if tags: tagstr = ' '.join('%s=%s' % (k, v) for k, v in tags.iteritems()) tagstr = ' ' + tagstr.strip() #设置程序获取信号以后的动作,都是shutdown collector atexit.register(shutdown) for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, shutdown_signal) #开始创建收集信息线程 reader = ReaderThread(options.dedupinterval, options.evictinterval) #执行线程 reader.start() #开始创建发送数据线程 sender = SenderThread(reader, options.dryrun, options.host, options.port, not options.no_tcollector_stats, tagstr) #执行线程 sender.start() LOG.info('SenderThread startup complete') #收集和发送线程开始以后,开始执行具体的收集器了 #如果从标准输入收集,把StdinCollector()实例通过register_collector注册到我们的收集器字典中 if options.stdin: register_collector(StdinCollector()) #stdin开始循环 stdin_loop(options, modules, sender, tags) else: sys.stdin.close() #如果不是stdin模式,执行我们模块的收集方法 main_loop(options, modules, sender, tags) LOG.debug('Shutting down -- joining the reader thread.') #等待reader退出 reader.join() LOG.debug('Shutting down -- joining the sender thread.') #等待sender退出 sender.join() def stdin_loop(options, modules, sender, tags): """ stdin模式下的循环函数,这里本质上就是让主线程堵塞用,以便读取stdin信息的线程正常获取信息,就没其他用途了 :param options: :param modules: :param sender: :param tags: :return: """ global ALIVE next_heartbeat = int(time.time() + 600) while ALIVE: time.sleep(15) reload_changed_config_modules(modules, options, sender, tags) now = int(time.time()) if now >= next_heartbeat: LOG.info('Heartbeat (%d collectors running)' % sum(1 for col in all_living_collectors())) next_heartbeat = now + 600 def main_loop(options, modules, sender, tags): """ 执行模块方法 :param options:相关参数 :param modules:模块列表 :param sender:发送数据线程 :param tags:自定义tags :return: """ next_heartbeat = int(time.time() + 600) while True: #更新或者添加模块对应的collector populate_collectors(options.cdir) #重新加载模块 reload_changed_config_modules(modules, options, sender, tags) reap_children() spawn_children() time.sleep(15) #检查collector的心跳,每10分钟一次 now = int(time.time()) if now >= next_heartbeat: LOG.info('Heartbeat (%d collectors running)' % sum(1 for col in all_living_collectors())) next_heartbeat = now + 600 def list_config_modules(etcdir): """ 获取目录下的python文件列表 :param etcdir: :return: """ if not os.path.isdir(etcdir): return iter(()) # Empty iterator. return (name for name in os.listdir(etcdir) if (name.endswith('.py') and os.path.isfile(os.path.join(etcdir, name)))) def load_etc_dir(options, tags): """ 加载etc下的模块 :param options: :param tags: :return: """ etcdir = os.path.join(options.cdir, 'etc') #引入python lib 路径 sys.path.append(etcdir) modules = {} #加载etc目录下的模块 for name in list_config_modules(etcdir): path = os.path.join(etcdir, name) #加载模块 module = load_config_module(name, options, tags) #将模块放入到modules字典中,注意字典的键值 modules[path] = (module, os.path.getmtime(path)) return modules def load_config_module(name, options, tags): """ 加载模块 :param name:模块的名称 :param options: :param tags: :return: """ if isinstance(name, str): LOG.info('Loading %s', name) d = {} # Strip the trailing .py module = __import__(name[:-3], d, d) else: module = reload(name) #如果模块有onload函数,执行这个函数,这个函数可以进行一些模块变量初始化 onload = module.__dict__.get('onload') if callable(onload): try: onload(options, tags) except: LOG.fatal('Exception while loading %s', name) raise return module def reload_changed_config_modules(modules, options, sender, tags): """ 重新加载变化了的模块 :param modules: :param options: :param sender: :param tags: :return: """ etcdir = os.path.join(options.cdir, 'etc') current_modules = set(list_config_modules(etcdir)) current_paths = set(os.path.join(etcdir, name) for name in current_modules) changed = False #加载时间戳变化了的模块 for path, (module, timestamp) in modules.iteritems(): if path not in current_paths: continue mtime = os.path.getmtime(path) if mtime > timestamp: LOG.info('Reloading %s, file has changed', path) module = load_config_module(module, options, tags) modules[path] = (module, mtime) changed = True #删除多余的模块 for path in set(modules).difference(current_paths): LOG.info('%s has been removed, tcollector should be restarted', path) del modules[path] changed = True #检查是否有新的模块添加,如果有添加 for name in current_modules: path = os.path.join(etcdir, name) if path not in modules: module = load_config_module(name, options, tags) modules[path] = (module, os.path.getmtime(path)) changed = True if changed: sender.tagstr = ' '.join('%s=%s' % (k, v) for k, v in tags.iteritems()) sender.tagstr = ' ' + sender.tagstr.strip() return changed def write_pid(pidfile): """ 创建pid文件,把进程编号写入文件 :param pidfile: :return: """ f = open(pidfile, "w") try: f.write(str(os.getpid())) finally: f.close() def all_collectors(): """ 返回所有的collector :return: """ return COLLECTORS.itervalues() def all_valid_collectors(): """ 返回有效的模块 :return: """ now = int(time.time()) for col in all_collectors(): #返回没有被标记dead和上次使用时间不得超过3600的collector if not col.dead or (now - col.lastspawn > 3600): yield col def all_living_collectors(): """ 返回所有正在执行的collector :return: """ for col in all_collectors(): if col.proc is not None: yield col def shutdown_signal(signum, frame): """ 关闭掉整个进程 :param signum: :param frame: :return: """ LOG.warning("shutting down, got signal %d", signum) shutdown() def kill(proc, signum=signal.SIGTERM): """ 发送signum信号给对应的进程 :param proc: :param signum: :return: """ os.kill(proc.pid, signum) def shutdown(): """ 关闭进程和线程 :return: """ global ALIVE #修改ALIVE全局变量 if not ALIVE: return ALIVE = False LOG.info('shutting down children') #关闭掉collector for col in all_living_collectors(): col.shutdown() LOG.info('exiting') sys.exit(1) def reap_children(): """ 检查收集数据模块执行子进程,看看是否有死的进程了,如果有,要不要重启它 :return: """ for col in all_living_collectors(): now = int(time.time()) status = col.proc.poll() if status is None: continue col.proc = None #跟进子进程退出状态决定是否需要重新注册collector if status == 13: LOG.info('removing %s from the list of collectors (by request)', col.name) col.dead = True elif status != 0: LOG.warning('collector %s terminated after %d seconds with ' 'status code %d, marking dead', col.name, now - col.lastspawn, status) col.dead = True else: register_collector(Collector(col.name, col.interval, col.filename, col.mtime, col.lastspawn)) def set_nonblocking(fd): """ 设置fd为noblock模式 :param fd: :return: """ fl = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, fl) def spawn_collector(col): """ 执行模块脚本 :param col: :return: """ LOG.info('%s (interval=%d) needs to be spawned', col.name, col.interval) try: col.proc = subprocess.Popen(col.filename, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except OSError, e: LOG.error('Failed to spawn collector %s: %s' % (col.filename, e)) return col.lastspawn = int(time.time()) set_nonblocking(col.proc.stdout.fileno()) set_nonblocking(col.proc.stderr.fileno()) if col.proc.pid > 0: col.dead = False LOG.info('spawned %s (pid=%d)', col.name, col.proc.pid) return LOG.error('failed to spawn collector: %s', col.filename) def spawn_children(): """ 执行子进程,进行收集数据 :return: """ for col in all_valid_collectors(): now = int(time.time()) if col.interval == 0: if col.proc is None: spawn_collector(col) elif col.interval <= now - col.lastspawn: if col.proc is None: spawn_collector(col) continue if col.nextkill > now: continue if col.killstate == 0: LOG.warning('warning: %s (interval=%d, pid=%d) overstayed ' 'its welcome, SIGTERM sent', col.name, col.interval, col.proc.pid) kill(col.proc) col.nextkill = now + 5 col.killstate = 1 elif col.killstate == 1: LOG.error('error: %s (interval=%d, pid=%d) still not dead, ' 'SIGKILL sent', col.name, col.interval, col.proc.pid) kill(col.proc, signal.SIGKILL) col.nextkill = now + 5 col.killstate = 2 else: LOG.error('error: %s (interval=%d, pid=%d) needs manual ' 'intervention to kill it', col.name, col.interval, col.proc.pid) col.nextkill = now + 300 def populate_collectors(coldir): """ 更新或者添加collector :param coldir:collector目录 :return: """ global GENERATION GENERATION += 1 #获取以数字命名的目录列表 for interval in os.listdir(coldir): if not interval.isdigit(): continue interval = int(interval) #获取目录下的文件 for colname in os.listdir('%s/%d' % (coldir, interval)): if colname.startswith('.'): continue filename = '%s/%d/%s' % (coldir, interval, colname) if os.path.isfile(filename) and os.access(filename, os.X_OK): #获取文件的修改时间 mtime = os.path.getmtime(filename) if colname in COLLECTORS: col = COLLECTORS[colname] #相同colname的collector的interval不同,不允许 if col.interval != interval: LOG.error('two collectors with the same name %s and ' 'different intervals %d and %d', colname, interval, col.interval) continue #根据修改时间,如果文件被修改了,重新注册collector col.generation = GENERATION if col.mtime < mtime: LOG.info('%s has been updated on disk', col.name) col.mtime = mtime if not col.interval: #如果是interval为0的文件,停掉正在运行的collector实例 col.shutdown() LOG.info('Respawning %s', col.name) register_collector(Collector(colname, interval, filename, mtime)) else: #否则直接添加collector register_collector(Collector(colname, interval, filename, mtime)) #删除掉那些已经不存在的模块 to_delete = [] for col in all_collectors(): if col.generation < GENERATION: LOG.info('collector %s removed from the filesystem, forgetting', col.name) col.shutdown() to_delete.append(col.name) for name in to_delete: del COLLECTORS[name] if __name__ == '__main__': sys.exit(main(sys.argv)) |
转自:http://vcode.org/category/opentsdb/

微信公众号
扫一扫关注运维生存时间公众号,获取最新技术文章~