分类 Python 下的文章

转拼音多音字的处理

背景

汉字转拼音五笔 最开始的时候选择了粗暴简单的方法,就是在遇到多音字的时候,直接取第一个读音;但是后来同事使用的时候发现多音字的转换效果太差了,于是进行了改造;刚开始的时候使用的php-jieba,但是php在每次request的时候都需要去加载jieba的词库,极其低效;所以选择了使用python来实现逻辑,php通过thrift来调用python的服务

处理流程

piyin.png

根据magnet下载torrent

整体流程

magnet2torrent.png

requirement

# 安装virtualenv
pip install virtualenv
# 创建虚拟环境
virtualenv env
# source
. ./env/bin/activate

安装依赖包
pip install beanstalkc

Client

实现往beanstalkd里面塞hash

import beanstalkc
import sys

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("usage: %s <hash>" % sys.argv[0])
        sys.exit(1)
    if len(sys.argv[1]) != 40:
        print("hash code is error.")
        sys.exit(1)
    beanstalk = beanstalkc.Connection(host = "localhost", port = 11300)
    beanstalk.put(sys.argv[1])
    print("hash has put")

server

依赖libtorrent下载torrent

wget https://github.com/arvidn/libtorrent/releases/download/libtorrent-1_0_10/libtorrent-rasterbar-1.0.10.tar.gz
tar zxf libtorrent-rasterbar-1.0.10.tar.gz
cd libtorrent-rasterbar-1.0.10
./configure --enable-python-binding PYTHON=`which python` --prefix=$VIRTUAL_ENV LIBS='-liconv'
make -j2 && make install

依赖beanstalkd队列

mkdir /usr/local/beanstalkd/bin
wget https://github.com/kr/beanstalkd/archive/v1.10.tar.gz
tar zxf v1.10.tar.gz
cd beanstalkd-1.10/
make
mv beanstalkd /usr/local/beanstalkd/bin

mkdir /data/beanstalkd/
# /usr/local/beanstalkd/bin/beanstalkd -b /data/beanstalkd/

使用supervisor管理beanstalkd

[program:beanstalkd]
directory = /data/beanstalkd/
command = /usr/local/beanstalkd/bin/beanstalkd -b /data/beanstalkd/

Python2.7 支持 ThreadPoolExecutor 的话还需要 futures 包

pip install futures

上Server端代码

import time
import tempfile
import libtorrent
import os
import os.path as path
import shutil
from concurrent import futures
import beanstalkc

def hash2torrent(torrent_hash, timeout = None):
    torrent_hash = torrent_hash.lower()
    print("start download: %s" % (torrent_hash))
    magnet = "magnet:?xt=urn:btih:" + torrent_hash

    directory = path.join("torrents", torrent_hash[0:2], torrent_hash[-2:])
    output = path.join(directory, torrent_hash + ".torrent")

    if not path.exists(directory):
        os.makedirs(directory)

    if path.exists(output):
        print('Already exists.')
        return output

    tempdir = tempfile.mkdtemp()
    session = libtorrent.session()

    session.add_dht_router('router.bittorrent.com', 6881)
    session.add_dht_router('router.utorrent.com', 6881)
    session.add_dht_router('router.bitcomet.com', 6881)
    session.add_dht_router('dht.transmissionbt.com', 6881)
    session.add_dht_router("dht.aelitis.com", 6881)
    session.start_dht()

    params = {
        'save_path': tempdir,
        # 'storage_mode': libtorrent.storage_mode_t(2),
        # 'paused': False,
        # 'auto_managed': True,
        'duplicated_is_error': True
    }

    handle = libtorrent.add_magnet_uri(session, magnet, params)

    cost = 0
    while not handle.has_metadata():
        if timeout is not None and cost > timeout:
            print("Timeout.")
            # session.pause()
            session.remove_torrent(handle)
            shutil.rmtree(tempdir)
            return None
        time.sleep(1)
        cost = cost + 1
    # session.pause()
    print("Downloaded. %d" % (cost))

    # print 'got metadata, starting torrent download...'
    # while handle.status().state != libtorrent.torrent_status.seeding:
    #     s = handle.status()
    #     state_str = ['queued', 'checking', 'downloading metadata', 'downloading', 'finished', 'seeding', 'allocating']
    #     print '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s %.3f' % (s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, s.num_peers, state_str[s.state], s.total_download/1000000)
    #     time.sleep(3)

    torrent_info = handle.get_torrent_info()
    torrent_file = libtorrent.create_torrent(torrent_info)

    torrent_content = libtorrent.bencode(torrent_file.generate())

    with open(output, "wb") as f:
        f.write(torrent_content)
        f.close()

    session.remove_torrent(handle)
    shutil.rmtree(tempdir)
    return output

if __name__ == '__main__':
    beanstalk = beanstalkc.Connection(host='localhost', port=11300)
    timeout = None
    with futures.ProcessPoolExecutor(10) as executor:
        while True:
            job = beanstalk.reserve()
            torrent_hash = job.body
            job.delete()
            executor.submit(hash2torrent, torrent_hash, timeout)

实测,平均每个种子的下载时间需要 15 分钟左右。

访问可视化(续)

之前写过一片文章:访问可视化,但是局限性很大,于是利用周末的时间,改造了一番。

演示地址:http://tool.lu/visitor

打点的改造

老版本:在php里面进行打点

新版本:使用js加载空伪1px gif图片

数据分析和存储的改造

老版本:请求过来的时候,php分析,存进mysql

新版本:请求的时候,js分析,加载1px gif,nginx记录日志,python分析,存进数据库

数据的访问实时dump

老版本:监听mysql的binlog,publish到redis

新版本:分析nginx,存进数据库的同时,publish到redis

IP归属地 和 经纬度的查询

老版本:纯真数据库 + 百度地图地址反解

新版本:ip17mon + 腾讯地图行政区划latlag

预览

QQ20150209-1.png

1. js的打点日志

具体可以参考:网站统计中的数据收集原理及实现

不过我使用的方法由文章简化而来。

2. nginx的日志记录

server {
    listen 80;
    listen 443 ssl;
    server_name analytics.tool.lu;

    ssl_certificate vhosts/tool.lu.chained.crt;
    ssl_certificate_key vhosts/tool.lu.key;

    root /data/html/analytics.tool.lu/public;

    access_log off;
    error_log off;

    location / {
        index index.html;
        try_files $uri $uri/ =404;
    }

    location /__utm.gif {
        expires -1;
        if_modified_since off;
        # add_header Last-Modified "";
        empty_gif;
        access_log /data/log/nginx/analytics.tool.lu.access.log;
    }
}

3. nginx日志的分析

这里有几点要注意的:

  1. nginx日志我是使用logrotate切分的,要达到实时读取nginx日志的目的,需要保证读取的文件是最新切分出来的文件,而且在切分文件的时候要保证之前的文件已经处理完毕。
  2. 记录日志文件处理的offset,又是由于logrotate切分nginx日志的原因,如何保证记录的offset是唯一的,且能对应上日志文件,这里我使用了log文件创建的时间作为标识(在linux下是不储存文件的创建时间的,需要自己记录实现)。
  3. nginx log中时间格式的解析,基本上就是抓瞎了(不是所有版本的python strptime都支持timezone的)
  4. 如何通过腾讯地图 api 返回的数据建立索引,方便快速查找到 地址对应的经纬度

为了安全性考虑,这边就不贴 nginx log 解析的完整代码了。

ua-parser返回version的拼接

    def version(self, vs):
        v = ''
        if not vs['major']:
            return v
        v += vs['major']
        if not vs['minor']:
            return v
        v += '.' + vs['minor']
        if not vs['patch']:
            return v
        v += '.' + vs['patch']
        if not 'patch_minor' in vs or not vs['patch_minor']:
            return v
        v += '.' + vs['patch_minor']
        return v

腾讯地图行政区划数据索引的建立

        # IP17MON返回的数据带有,省市,可以优先市级别的索引,再省级别的索引
    def index(self):
        for province in self.data['result'][0]:
            self.indexed_provinces[province['name']] = province['location']
        for city in self.data['result'][1]:
            self.indexed_cities[city['name'] if 'name' in city else city['fullname']] = city['location']

python

list 的 shift

l = [1, 2]
first = l.pop(0)

parse nginx log的正则

self.pattern = re.compile(r'(?P<ipaddress>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - - \[(?P<dateandtime>\d{2}\/[a-z]{3}\/\d{4}:\d{2}:\d{2}:\d{2}) [+-]\d{4}\] ((\"(GET|POST) )(?P<url>.+)(http\/1\.1")) (?P<statuscode>\d{3}) (?P<bytessent>\d+) (["](?P<referrer>(\-)|(.+))["]) (["](?P<useragent>.+)["])', re.I)

nginx log时间的解析

datetime.datetime.strptime(fields['dateandtime'], '%d/%b/%Y:%X')

PS: 好久不写python已有些生疏

python 魔术函数调用的实现

场景

实现一个python调用java接口的功能,java接口是以http方式提供的。为了实现比较舒服的调用的方式,我不准备以send({"method": "echo"})这种方式调用,而是api.echo()

但是python里面并没有提供类似php的__call__callStatic的函数

根据python的__getattr__来实现一个,但是这个解决方案不完美,有局限性。

实现

# encoding: utf-8

import json
import urllib, urllib2

class OPS(object):
    s = None

    def __init__(self, s):
        self.s = s

    def sendRequest(self, msg):
        postData = json.dumps(msg)
        print(postData)
        req = urllib2.Request(self.s, postData)
        try:
            resp = urllib2.urlopen(req)
        except Exception as e:
            return False
        cnt = resp.read()
        try:
            cnt = json.loads(cnt)
        except Exception as e:
            pass
        return cnt

    def __getattr__(self, name):
        def func(*args, **kwargs):
            # args 不处理 (由于是序列化成json传输的,python的dict是无序的,抛弃对list的处理)
            data = {
                'method': name,
                'parameter': kwargs
            }
            return self.sendRequest(data)
        return func

if __name__ == '__main__':
    ops = OPS('你的http接口调用地址')
    print(ops.hello(word="你好"))

PHP unpack VS Python unpack

unpack 对二进制数据解包。

php unpack的结果 数组的索引是从 1 开始的
python unpack的结果是 元祖,索引从 0 开始

php unpack可以传大于需要解包长度的二进制串
python unpack只能传入需要的长度的二进制串

php unpack 和 python unpack 的解包格式不一样,例如:

这是纯真数据库自动更新的例子: QQWryUpdate

// php
unpack('V6', $bin);

# python
import struct
struct.unpack('<6L', bin[0:24])

2017-07-25更新

echo pack('H*', $hash);
import codecs
from base64 import b32encode

print(b32encode(codecs.decode(hash, 'hex')).decode())

[scrapy] laracasts爬虫

购买了1个月的laracasts.com的帐号,又怕有的时候没时间看,所以就都抓取下来。

  1. 将标题,描述都保存到mysql数据库
  2. 将视频下载到本地
CREATE TABLE `laracasts_lessons` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `title` varchar(255) NOT NULL DEFAULT '',
  `downlink` varchar(255) NOT NULL DEFAULT '',
  `description` text NOT NULL,
  `path` varchar(255) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;

这里说一下流程,代码就不贴出来了,有需要的可以下载附件。请先修改文件内的数据库用户名密码,和laracasts的账户,再执行./laracasts.sh

登录 -> 遍历列表页面 -> 遍历课程页面 -> 获取信息 -> 下载视频 -> 保存到MySQL

laracasts.zip

python抓取taobao ip数据库

准备

由于taobao提供了rest api,所以这次就不用scrapy了,改用unirest

pip install unirest

代码

unirest是异步请求的(多线程),所以提供一个callback,但是要是有额外的参数需要传入,用全局变量的话会产生问题。

具体的实现可以根据纯真IP数据库的IP段来查询,单台机器抓取的话估计2~3天的样子,下面是单个IP段抓取的实现代码:

# encoding: utf-8
import time, unirest

def pp(extra):
    # 其实这种做法和js很像,用一个闭包限制变量的作用域
    def p(resp):
        print(extra)
        # 在这里做数据的处理和储存
    return p

def main():
    r = ['192.9.201.0', '192.9.201.255'];
    unirest.get('http://ip.taobao.com/service/getIpInfo.php', headers = {}, params = {'ip': r[0]}, auth = (), callback = pp(r))
    # taobao访问限制:为了保障服务正常运行,每个用户的访问频率需小于10qps。
    time.sleep(.5)

if __name__ == '__main__':
    main()

cython将python转换为c

选择

之前使用过pyinstaller打包python程序,但是遇到读取相对路径下的配置文件的问题,而且这些只是预编译成python的2进制码。

选择cython和pyinstaller的原因主要考虑的是批量部署的方便性。cython能直接将python翻译成c,然后编译,在安全性上有了一定的保障。

安装cython

选择最简单的安装方法,使用pip包管理

pip install cython
# 在网上找的一个脚本,感觉很好用的,这里直接编译成可执行程序
mkcython.sh -e main.py

mkcython.sh.zip

pygments生成图片中的中文

先上图片:preview.png

非要中英文都有的字体么

说实话,原生的“中英文都有的”字体,都不是很适合用来显示代码。而网上有个雅黑和consolas的混合字体,但感觉对字体的依赖性比较大

pygments有个好处,就是本来就支持高亮的结果存为图片,于是要对其进行修改,让他用不用的字体来渲染中文和英文。

code = '''
#!/usr/bin/env python
# encoding: utf-8

from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import ImageFormatter
# 可以有中文么a可以的
code = ""
content = highlight(code, PythonLexer(), ImageFormatter(font_name = 'WenQuanYi Zen Hei'))
with open('a.png', 'wb') as handle:
	handle.write(content)
	handle.close()
'''.decode('utf-8')
# content = highlight(code, PythonLexer(), ImageFormatter(font_name = 'Consolas', line_numbers = False, font_size = 20))
content = highlight(code, PythonLexer(), ImageFormatter(font_name = 'Consolas', cfont_name = 'Microsoft Yahei', line_numbers = False, font_size = 20, cfont_size = 13))
with open('a.png', 'wb') as handle:
	handle.write(content)
	handle.close()

我修改了一下image的生成,在原来的基础上加了两个参数cfont_name, cfont_size

原理就是分离出中文,然后使用中文字体渲染,而英文使用英文字体渲染。

expect的使用

expect可以自动化执行需要交互的命令。

安装

yum -y install expect

使用

创建一个文件test.exp

#!/usr/bin/env expect
spawn php go-pear.phar
expect ":"
send "\n"
interact
expect test.exp

python版本

pip install pexpect
#!/usr/bin/env python
# encoding: utf-8
import pexpect

cmd = 'php go-pear.phar'
child = pexpect.spawn(cmd, timeout=None)
child.expect(':')
child.sendline('')
# child.interact()
child.close()

自动禁用连接数高的IP

python代码

由于最近服务器受到大流量的攻击,于是写了该脚本。

#! /usr/bin/env python
# encoding: utf-8

import os, time

command = "/bin/netstat -antp|grep :80|awk ' ''{print $5}'|awk -F: '{print $1}'|sort -r|uniq -c|sort -n -k1 -r"
maxconn = 150

handle = os.popen(command)
for line in handle:
	fields = line.strip().split(' ')
	if len(fields) != 2:
		continue
	conn, ip = fields
	# 由于我们使用的是nginx作为反向代理,访问apache,所以这里要加上127.0.0.1
	if int(conn) > maxconn and ip != '127.0.0.1':
		print(time.strftime('%Y-%m-%d %X'), conn, ip)
		# 这里只会临时将ip拒绝,当重新启动iptables服务的时候失效
		os.system('/sbin/iptables -I INPUT -s %s/24 -j DROP' % ip)
handle.close()

定时执行

将python文件加入crontab中即可。

使用python Queue创建文档格式转换服务

该项目比较简单,只具备文件排队转换,不具有查看队列的功能,使用openoffice和unoconv作为转换程序,程序不提供建立系统服务

使用场景,一般使用thrift建立服务,将该程序挂载在里面。

#!/usr/bin/env python
#encoding: utf-8
#author: xiaozi <245565986@qq.com>
import Queue, threading, os, subprocess
import time

class Converter(object):
	def __init__(self):
		self._tasks = Queue.Queue()
		self._docs = Queue.Queue()
		self._pdfs = Queue.Queue()

		self._toPdfThreads = []
		self._toSwfThreads = []
		self._toPdfThreadNum = 4
		self._toSwfThreadNum = 4

		self._dispatchThread = threading.Thread(target=self.dispatchTask)
		self._dispatchThread.setDaemon(True)

		for i in range(self._toPdfThreadNum):
			pdfThread = threading.Thread(target=self.toPdf)
			pdfThread.setDaemon(True)
			self._toPdfThreads.append(pdfThread)

		for i in range(self._toSwfThreadNum):
			swfThread = threading.Thread(target=self.toSwf)
			swfThread.setDaemon(True)
			self._toSwfThreads.append(swfThread)
		
		self._dispatchThread.start()
		for i in range(self._toPdfThreadNum):
			self._toPdfThreads[i].start()
		for i in range(self._toSwfThreadNum):
			self._toSwfThreads[i].start()

	def addTask(self, path):
                print(path)
		self._tasks.put(path)

	def dispatchTask(self):
		while True:
			path = self._tasks.get()
			if not path:
				break
			ext = os.path.splitext(path)[1][1:].lower()
			if ext == 'doc' or ext == 'docx':
				self._docs.put(path)
			elif ext == 'pdf':
				self._pdfs.put(path)

	def toPdf(self):
		while True:
			path = self._docs.get()
			if not path:
				break
			subprocess.Popen(
				['python', 'unoconv', '-f', 'pdf', path]
			).communicate()
			newpath = os.path.splitext(path)[0] + '.pdf'
			self._pdfs.put(newpath)

	def toSwf(self):
		while True:
			path = self._pdfs.get()
			if not path:
				break
			# do something
			path_target = os.path.splitext(path)[0]
			if not os.path.exists(path_target):
				os.makedirs(path_target)
			subprocess.Popen(['pdf2swf', path, '-o', path_target + '/page%.swf', '-f', '-T', '9', '-t', '-s', 'storeallcharacters']).communicate()
			print(path)

	def close(self):
		self._tasks.put(None)
		self._dispatchThread.join()

		for i in range(self._toPdfThreadNum):
			self._docs.put(None)
		for i in range(self._toPdfThreadNum):
			self._toPdfThreads[i].join()
		# '转换为pdf'线程都结束之后,再结束'转换为swf'的线程
		for i in range(self._toSwfThreadNum):
			self._pdfs.put(None)
		for i in range(self._toSwfThreadNum):
			self._toSwfThreads[i].join()

if __name__ == '__main__':
	converter = Converter()
	converter.addTask('docs/paper.docx')

	time.sleep(3)

	converter.close()

下面是使用thrift建立的service

#!/usr/bin/env python
# encoding: utf-8

import sys, time

import convert

from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

sys.path.append('thrift/gen-py')

from ConvertApi import *
from ConvertApi.ttypes import *

class ServerHandle(ConvertApi.Iface):
	def __init__(self):
		self.converter = convert.Converter()

	def addTask(self, path):
		self.converter.addTask(path)
		return True

def main(port = 1990):
	processor = ConvertApi.Processor(ServerHandle())
	transport = TSocket.TServerSocket(port = port)
	tfactory = TTransport.TBufferedTransportFactory()
	pfactory = TBinaryProtocol.TBinaryProtocolFactory()

	server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
	print('Starting server on :%d' % port)
	server.serve()

if __name__ == '__main__':
	port = 110110
	main(port)

thrift文件如下(该接口提供的比较简单):

service ConvertApi {
	bool addTask(1: string path);
}

Openfire + err

安装Openfire

Openfire是IM的服务器端,下载地址:http://www.igniterealtime.org/downloads/index.jsp,选择linux rpm包下载

rpm -ivh openfire-3.7.1-1.i386.rpm

配置Openfire

首先在mysql里面建立一个数据库openfire,访问9090端口,进入网页进行初始化配置,增加一个用户,下面的err配置的时候会用到

安装err

err是一个IM机器人,项目地址:https://github.com/gbin/err,最简单的安装方法easy_install

easy_install err

建立几个需要用到的文件夹

mkdir -p /var/www/err
mkdir -p /var/log/err
mkdir -p /var/lib/err

chmod -R 777 /var/www/err
chmod -R 777 /var/log/err
chmod -R 777 /var/lib/err

cp /usr/lib/python2.6/site-packages/err-1.6.7-py2.6.egg/errbot/config-template.py /var/www/err/config.py

cd /var/www/err/

# 按照自己的要求修改吧,把上面配置的用户加到这个里面
vi config.py

其他

err.py命令启动的时候,可能还会报其他的模块不存在,直接easy_install好了

关于机器人插件的开发,可以参考项目主页上的wiki