MENU

故意踩坑式写脚本——简单检测 TCP 连接

July 21, 2018 • Read: 4628 • 越过长城,代码

墙换成 TCP 阻断的模式以后,就没法想以前那样用多地 ping 的方式检测,只好自己手写一个 TCP 连接来判断。本来可以很轻松的搞定,但写的时候故意放了一些啰嗦的东西在里面,强迫自己多了解几个不同的场景。

成品

日志记录

这是一个非常常见的场景,但是以前的写法都不太好,这次总结一个比较方便也能利用到日志模块功能的写法。

写一个 log 类,比如放在 utils/log.py 里。

import logging
import logging.config
import os

try:
    from settings import LOG_TYPE
except ImportError:
    print('Copy settings.example.py to settings.py first!')
    exit(code=1)


class Logger(object):
    def __init__(self, logger):
        """
        封装 log 类,用来统一获取 logger
        :param logger: 传入 logger 名称,可用 __name__ 代替
        """
        log_file = 'logs/wt.log'
        log_dir = os.path.split(log_file)[0]
        if not os.path.exists(log_dir):
            os.mkdir(log_dir)

        logging_dict = {
            'version': 1,
            'disable_existing_loggers': False,
            'loggers': {
                '': {
                    'level': 'DEBUG',
                    'handlers': LOG_TYPE,
                },
            },
            'formatters': {
                'colored_console': {
                    '()': 'coloredlogs.ColoredFormatter',
                    'format': "[%(asctime)s] [%(name)s] [line:%(lineno)s] [%(levelname)s] %(message)s",
                    'datefmt': '%Y-%m-%d %H:%M:%S',
                },
                'format_for_file': {
                    'format': "[%(asctime)s] [%(name)s] [line:%(lineno)s] [%(levelname)s] %(message)s",
                    'datefmt': '%Y-%m-%d %H:%M:%S',
                }
            },
            'handlers': {
                'console': {
                    'level': 'INFO',
                    'class': 'logging.StreamHandler',
                    'formatter': 'colored_console',
                },
                'file': {
                    'level': 'DEBUG',
                    'class': 'cloghandler.ConcurrentRotatingFileHandler',
                    'formatter': 'format_for_file',
                    'filename': log_file,
                    'encoding': 'utf-8',
                    'delay': True,
                    'maxBytes': 1024 * 1024 * 100,
                    'backupCount': 5,
                }
            }
        }

        logging.config.dictConfig(logging_dict)
        logger = logging.getLogger(logger)

        # requests 不打印 log
        logging.getLogger("requests").setLevel(logging.WARNING)
        logging.getLogger("urllib3").setLevel(logging.WARNING)

        self.logger = logger

    def get_logger(self):
        return self.logger

日志类有两个依赖。

coloredlogs
ConcurrentLogHandler

一个是用来在控制台彩色输出,另一个是用来进程安全地写日志。

首先是创建了存放日志的文件夹,然后在 logging_dict 中,定义了两个 handler,用于控制台和文件输出,两个 handlerformatter 也做了定义,即 colored_consoleformat_for_file

                'colored_console': {
                    '()': 'coloredlogs.ColoredFormatter',
                    'format': "[%(asctime)s] [%(name)s] [line:%(lineno)s] [%(levelname)s] %(message)s",
                    'datefmt': '%Y-%m-%d %H:%M:%S',
                }

这里是为了引用 coloredlogs 自己的类来定义 formatter,颜色输出都是库默认的。

        logging.config.dictConfig(logging_dict)
        logger = logging.getLogger(logger)

        self.logger = logger
    def get_logger(self):
        return self.logger

这里用类初始化的参数定义了 logger 的名字,用 get_logger 方法返回。

        # requests 不打印 log
        logging.getLogger("requests").setLevel(logging.WARNING)
        logging.getLogger("urllib3").setLevel(logging.WARNING)

requests 库会自己开始打日志,很多都没有必要,所以调整一下日志的级别。

在别的地方引入日志类,自行初始化。

logger = Logger(__name__).get_logger()
[2018-07-21 21:32:40] [__main__] [line:58] [INFO] xx.xxx TCP test success.
[2018-07-21 21:32:40] [__main__] [line:48] [INFO] xxxxxxx ICMP test success.
[2018-07-21 21:32:40] [__main__] [line:48] [INFO] xxxxxxxxx ICMP test success.
[2018-07-21 21:32:44] [__main__] [line:32] [ERROR] Notify error: {'errno': 1024, 'errmsg': '不要重复发送同样的内容'}

通知推送

内容复杂的可以用邮件推送,除了 SMTP 以外还可以用一些邮件服务商,比如 MailGun 之类。如果比较简单就可以用 ServerChan,直接推送到微信。

任务调度

本来一个 crontab 就能做的很好的事情,我就是想故意踩个坑,所以用了 Python 来实现,这里用了 apscheduler 库。

apscheduler 分了几个模块,分别是 Triggers,Jobstores,Executors,Schedulers。

Triggers 就是触发器,常用的有 date,interval 和 cron 三种,date 用来执行单次的任务,interval 执行间隔时间的任务,cron 执行的是 crontab 的规则。

Schedulers 是调度器,常用的其实就两种。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler

一种是阻塞式的,另一种是添加了任务后就放到后台运行。

添加任务可以用装饰器的方式。

sched = BlockingScheduler()

@sched.scheduled_job(trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=1), id='first-run')
@sched.scheduled_job(trigger='cron', hour='10,20')
def main():

添加一个单次任务是因为调度的任务在程序跑第一次的时候并不会运行。。不便调试。。。

还可以写一个异常处理,免得接不住异常导致一些奇奇怪怪的问题。

def sched_listener(event):
    """
    apscheduler 事件监听
    :param event:
    :return:
    """
    if event.exception:
        notify('定时任务出错', key=SC_KEY)
        logger.error('定时任务出错 {e}'.format(e=event.exception))


sched.add_listener(sched_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

这样抛了异常也可以推送一条通知过来。

部署

nohup 直接跑也行,我自己用了 supervisor。

[program:wt]
directory=/home/user/wt/
command=/home/user/.pyenv/shims/python3 wt.py
stdout_logfile=none
stderr_logfile=none
user=user
autostart=true
autorestart=true

logfile 配成 none 是因为自己写了日志,不需要再写一遍了,command 里最好用绝对路径。

Archives QR Code
QR Code for this page
Tipping QR Code