python script demo

Table of Contents

python script 开发   @bolg @original

缘由

最近临时接了storm实时分析任务。
至于架构都是大同小异,无非是:
nginx日志 -> 采集日志客户端 -> kafka -> storm -> redis统计 -> mysql存储结果 -> web admin展示
......
详细抽空再整理。

这里先整理 redis统计结果到mysql这一步 的队列处理及告警简单处理。
并无多少业务逻辑跟技术难点,直接贴代码。(会整理提交到github)

整理的理由:
方便以后自己开发py script都可以按这目录结构
并使用使用 cxfreeze src/main.py --target-dir bin 打包
省事。

目录结构

├── README.txt
├── bin               //打包成二进制文件运行
├── data
│   ├── logging.conf  //日志配置文件
│   ├── product.conf  //生产环境配置文件
│   └── test.conf     //测试环境配置文件
├── logs
│   └── app.log       //应用log
└── src
    ├── base.py       //基类库
    ├── main.py       //主类
    └── todo.py       //TODO

4 directories, 8 files

main.py

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
'''
处理
队列数据
'''

'''
product 为生成环境配置
test 为测试环境配置
'''
_config_set = 'test'

from todo import Todo
import sys

if __name__ == '__main__':
        if len(sys.argv) < 2:
                print "参数有错,如:'python2.7 main.py test'测试环境 或 'python2.7
        main.py product'生产环境"
                sys.exit(0)
        _config_set = str(sys.argv[1].lower())
        todo = Todo(_config_set)
        print "todo start ", todo.getNowDatetime(), ""
        todo.alarm()
        print "todo end   ", todo.getNowDatetime(), "\n"

base.py

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
'''
base object
'''


import redis
import ConfigParser
import os
import logging
import logging.config
import datetime
import MySQLdb
import requests
import datetime
import time


class Base(object):
        # 配置文件后缀名
        _config_file_suffix = '.conf'
        # 配置文件路径,相对路径
        _config_file_path = '../data/'

        # new config
        _cf = None
        # new redis
        _redis = None
        # new logger
        _logger = None
        # config set
        _config_set = "test"
        # new mysql
        _mysql = None

        def __init__(self, config_set = None):
                if config_set != None:
                        self._config_set = config_set
                # init config
                self.initConfig()
                # init redis object
                self.initRedis()
                # init log object
                self.initLog()
                # init new mysql
                self.initMysql()

        def initMysql(self):
                if self._mysql == None:
                        db_host = self._cf.get('mysql', 'host')
                        db_user = self._cf.get('mysql', 'user')
                        db_pass = self._cf.get('mysql', 'pass')
                        db_name = self._cf.get('mysql', 'name')
                        db_charset = self._cf.get('mysql', 'charset')
                        self._mysql = MySQLdb.connect(host = db_host,
                                                                         user = db_user,
                                                                         passwd = db_pass,
                                                                         db = db_name,
                                                                         charset = db_charset)

        def initLog(self):
                '''
                new logger
                '''
                if self._logger == None:
                        logging.config.fileConfig(self._config_file_path + "logging.conf")
                        self._logger = logging.getLogger("bianalysisApp")

        def initConfig(self):
                '''
                get config data

                secs = cf.sections()
                print secs;

                opts = cf.options('redis')
                print opts

                kvs = cf.items('redis')
                print kvs

                value = cf.get(opts, item)
                '''
                configFileName = self._config_file_path + str(self._config_set) +
                self._config_file_suffix
                if os.path.exists(configFileName) == True:
                        try:
                                self._cf = ConfigParser.ConfigParser()
                                self._cf.read(configFileName)
                        except:
                                self._logger.error("对不起,配置文件不存在!")
                                sys.exit()
                else:
                        self._logger.error("对不起,配置文件不存在!")
                        sys.exit()

        def initRedis(self):
                '''
                new redis
                '''
                if self._redis == None:
                        redis_host = self._cf.get('redis', 'host')
                        redis_port = self._cf.get('redis', 'port')
                        redis_db = self._cf.get('redis', 'db')
                        pool = redis.ConnectionPool(host = redis_host,
                                                                                port = redis_port,
                                                                                db = redis_db)
                        self._redis = redis.StrictRedis(connection_pool=pool)

        def getNowDatetime(self):
                return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        def getRedisListMultiply(self, queueName):
                '''
                通过发送lua脚本批量获取redis list
                '''
                lua = """
                local key = KEYS[1]
                local n = ARGV[1]
                if not n or not key then
                        return nil
                end

                local ret = {}
                for i=1,n do
                        local v = redis.call("lpop", key)
                        if v then
                                ret[#ret+1] = v
                        else
                                break
                        end
                end

                return ret
                """
                list = []
                try:
                        getQueueMultiply = self._redis.register_script(lua)
                        list = getQueueMultiply(keys=[queueName], args=[10])
                except:
                        self._logger.error("get queue multiply error")
                return list

        def sendRtxContent(self, sendTitle, sendContent):
                rtx_send_users = self._cf.get('base', 'rtx_send_users')
                rtx_server_url = self._cf.get('base', 'rtx_server_url')
                rtx_app_id = self._cf.get('base', 'rtx_app_id')
                rtx_app_key = self._cf.get('base', 'rtx_app_key')
                postData = {
                        'appId' : rtx_app_id,
                        'appKey' : rtx_app_key,
                        'userName' : rtx_send_users,
                        'title' : sendTitle,
                        'content' : sendContent
                }
                try:
                        r = requests.post(rtx_server_url, data=postData, timeout = 5)
                        if r.status_code == requests.codes.ok:
                                return True
                        else:
                                self._logger.error("send rtx error")
                                return False
                except:
                        self._logger.error("send rtx except")
                        return False


        def sendPhoneContent(self, sendContent):
                phone_send_users = self._cf.get('base', 'phone_send_users')
                phone_server_url = self._cf.get('base', 'phone_server_url')
                postData = {
                        'to' : phone_send_users,
                        'content' : sendContent
                }
                try:
                        r = requests.post(phone_server_url, data=postData, timeout = 5)
                        if r.status_code == requests.codes.ok:
                                return True
                        else:
                                self._logger.error("send phone error")
                                return False
                except:
                        self._logger.error("send phone except")
                        return False


        def getOldTimeStampBySecond(self, oldTimeSecond1, oldTimeSecond2):
                result = []
                nowTime =  datetime.datetime.now()
                for i in range(oldTimeSecond1, oldTimeSecond2):
                        t = int(time.time()) - i
                        result.append(t)
                return result

todo.py

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
'''
BI业务逻辑处理
'''
import json
import MySQLdb
from base import Base
import time
import datetime
import random

class Todo(Base):

        def _init_(self, config_set):
                Base.__init__(config_set)


        def alarm(self):
                self.alarmBase()
                self.alarmQps()
                self.alarmQpsByM()
                # check server
                self.alarmSendRtx()
                self.alarmSendPhone()


        def alarmSendPhone(self):
                '''
                短信告警
                '''
                phone_send_check_nums = int(self._cf.get('base',
                'phone_send_check_nums'))
                queueNameKey = 'redis_alarm_queue_name_qps';
                redis_alarm_queue_name = self._cf.get('bianalysis', queueNameKey)
                check_len = int(self._redis.llen(redis_alarm_queue_name))
                if check_len > phone_send_check_nums:
                        self.sendPhoneContent("实时预警队列有积压未处理,请注意!")
                queueNameKey = 'redis_alarm_queue_name_base';
                redis_alarm_queue_name = self._cf.get('bianalysis', queueNameKey)
                check_len = self._redis.llen(redis_alarm_queue_name)
                if check_len > phone_send_check_nums:
                        self.sendPhoneContent("实时预警队列有积压未处理,请注意!")


        def alarmSendRtx(self):
                '''
                检测超过阀值入库的条数进行RTX报警
                '''
                timeMinute = time.strftime('%M',time.localtime(time.time()))
                checkList = ['01', '10', '20', '30', '40', '50']
                checkTimeMinute = checkList.count(timeMinute)
                if checkTimeMinute > 0:
                        beforeTimeHour = time.strftime('%Y-%m-%d
                %H',time.localtime(time.time()-60*60))+':00:00'
                        nowTimeHour = time.strftime('%Y-%m-%d
                %H',time.localtime(time.time()))+':00:00'
                        sql = "SELECT COUNT(*) AS NUMS FROM `tm_storm_statistics` WHERE\
                        alarmStatisticsDatetime > '%s' and alarmStatisticsDatetime <= '%s' "
                        cursor = self._mysql.cursor()
                        cursor.execute(sql % (beforeTimeHour, nowTimeHour))
                        result = cursor.fetchone()
                        nums = 0;
                        if result[0]:
                                nums = int(result[0])
                        if nums >= int(self._cf.get('base', 'rtx_send_check_nums')):
                                #print "todo send rtx"
                                self.sendRtxContent("统计告警", "统计上一个小时超过阀值太多,请
                注意!")

        def alarmTodo(self, queueName):
                '''
                告警处理
                '''
                list = self.getRedisListMultiply(queueName)
                queue_len = len(list)
                # test data
                #queuePopStr =
                #'{"alarmName":"","alarmType":1,"alarmVal":5000,"alarmDatetime"
                #: "2015-05-01 00:00:00", "alarmServerId" : 3}'
                if queue_len > 0 :
                        for item in list:
                                queuePopJson = json.loads(item)
                                print queuePopJson
                                #todo write mysql
                                self.writeToMysql(queuePopJson)
                                self._logger.info(item)
                else:
                        self._logger.info("queue is null")

        def alarmQps(self):
                '''
                '''
                queueNameKey = 'redis_alarm_queue_name_qps';
                redis_alarm_queue_name = self._cf.get('bianalysis', queueNameKey)
                self.alarmTodo(redis_alarm_queue_name)

        def alarmQpsByM(self):
                '''
                每分钟统计上一小时分钟QPS情况
                '''
                qps_ip_lists = self._redis.smembers('qps:all:lists')
                if len(qps_ip_lists) > 0:
                        sql = "INSERT INTO
                `qps_statistics_minute`(ip,serverid,qps,addtime,statistics_time, notes)
                VALUES"
                        for i in qps_ip_lists:
                                i_k =
                self.getOldTimeStampBySecond(int(self._cf.get('bianalysis',
                'total_old_time_start')), \
                                                                                                   int(self._cf.get('bianalysis',
                'total_old_time_end')))

                                i_k_total = 0
                                for j in i_k:
                                        i_k_redis = str(i) + str(j)
                                        if self._redis.get(i_k_redis) != None:
                                                i_k_total += int(self._redis.get(i_k_redis))
                                i_k_total = i_k_total / 60
                                i_split = i.split(":")
                                i_k_time_array = time.localtime(i_k[0])
                                i_k_time_str = time.strftime("%Y-%m-%d %H:%M:00",
                                i_k_time_array)
                                sql +=
                                "('"+str(i_split[2])+"','"+str(i_split[1])+"','"+str(i_k_total)+"','"\
                                           +str(self.getNowDatetime())+"','"+i_k_time_str+"', ''),"
                        sql = sql[0:-1]
                        try:
                                cursor = self._mysql.cursor()
                                cursor.execute(sql)
                                cursor.close()
                                self._mysql.commit()
                        except MySQLdb.Error,e:
                                self._logger.error(str(e.args[0]) + str(e.args[1]))


        def alarmBase(self):
                '''
                原本需求处理
                '''
                queueNameKey = 'redis_alarm_queue_name_base';
                redis_alarm_queue_name = self._cf.get('bianalysis', queueNameKey)
                self.alarmTodo(redis_alarm_queue_name)

        def writeToMysql(self, queuePopJson):
                '''
                ...
                1 => 客户端IP告警处理
                2 => 后端IP告警处理
                3 => 用户ID告警处理
                4 => 客户端版本号告警处理
                5 => key:PeerIP+UserID+DeviceID 告警处理
                6 => 请求的url告警处理
                7 => 统计QPS情况
                8 => 统计hxff情况
                '''
                if queuePopJson.get('alarmType') == 7:
                        #todo 后端机器分业务的QPS情况
                        ip = queuePopJson.get('alarmName')
                        serverid = queuePopJson.get('alarmServerId')
                        qps = queuePopJson.get('alarmVal')
                        addtime = self.getNowDatetime()
                        statistics_time = queuePopJson.get('alarmDatetime')
                        notes = json.dumps(queuePopJson)
                        sql = "INSERT INTO
                `qps_statistics`(ip,serverid,qps,addtime,statistics_time, notes)\
                        VALUES('%s', '%d', '%d','%s', '%s', '%s')"
                        try:
                                cursor = self._mysql.cursor()
                                cursor.execute(sql % (ip, serverid, qps, addtime,
                statistics_time, notes))
                                cursor.close()
                                self._mysql.commit()
                        except MySQLdb.Error,e:
                                self._logger.error(str(e.args[0]) + str(e.args[1]))
                else:
                        #todo 写入嫌疑业务的情况
                        sql = "INSERT INTO
                `tm_storm_statistics`(alarmName,alarmType,alarmVal, alarmNotes, \
                        alarmStatisticsDatetime, addtime, serverId)
                VALUES('%s','%d','%d','%s', '%s', '%s', '%d')"
                        cursor = self._mysql.cursor()
                        alarmName = queuePopJson.get('alarmName')
                        alarmType = int(queuePopJson.get('alarmType'))
                        alarmVal = int(queuePopJson.get('alarmVal'))
                        alarmNotes = json.dumps(queuePopJson)
                        alarmStatisticsDatetime = self.getNowDatetime()
                        if queuePopJson.get('alarmDatetime'):
                                alarmStatisticsDatetime = queuePopJson.get('alarmDatetime')
                        serverId = 0
                        if queuePopJson.get('alarmServerId'):
                                serverId = int(queuePopJson.get('alarmServerId'))
                        addtime = self.getNowDatetime()
                        try:
                                cursor.execute(sql % (alarmName, alarmType, alarmVal,
                alarmNotes, alarmStatisticsDatetime, addtime, serverId))
                                cursor.close()
                                self._mysql.commit()
                        except MySQLdb.Error,e:
                                self._logger.error(str(e.args[0]) + str(e.args[1]))

Author: josephzeng (josephzeng36@gmail.com)

Last Updated 2015-12-30. Created by Emacs 24.5.1 (Org mode 8.2.10)

Validate