使用python Queue创建文档格式转换服务

该项目比较简单,只具备文件排队转换,不具有查看队列的功能,使用openoffice和unoconv作为转换程序,程序不提供建立系统服务

使用场景,一般使用thrift建立服务,将该程序挂载在里面。

#!/usr/bin/env python
#encoding: utf-8
#author: xiaozi <245565986@qq.com>
import Queue, threading, os, subprocess
import time

class Converter(object):
	def __init__(self):
		self._tasks = Queue.Queue()
		self._docs = Queue.Queue()
		self._pdfs = Queue.Queue()

		self._toPdfThreads = []
		self._toSwfThreads = []
		self._toPdfThreadNum = 4
		self._toSwfThreadNum = 4

		self._dispatchThread = threading.Thread(target=self.dispatchTask)
		self._dispatchThread.setDaemon(True)

		for i in range(self._toPdfThreadNum):
			pdfThread = threading.Thread(target=self.toPdf)
			pdfThread.setDaemon(True)
			self._toPdfThreads.append(pdfThread)

		for i in range(self._toSwfThreadNum):
			swfThread = threading.Thread(target=self.toSwf)
			swfThread.setDaemon(True)
			self._toSwfThreads.append(swfThread)
		
		self._dispatchThread.start()
		for i in range(self._toPdfThreadNum):
			self._toPdfThreads[i].start()
		for i in range(self._toSwfThreadNum):
			self._toSwfThreads[i].start()

	def addTask(self, path):
                print(path)
		self._tasks.put(path)

	def dispatchTask(self):
		while True:
			path = self._tasks.get()
			if not path:
				break
			ext = os.path.splitext(path)[1][1:].lower()
			if ext == 'doc' or ext == 'docx':
				self._docs.put(path)
			elif ext == 'pdf':
				self._pdfs.put(path)

	def toPdf(self):
		while True:
			path = self._docs.get()
			if not path:
				break
			subprocess.Popen(
				['python', 'unoconv', '-f', 'pdf', path]
			).communicate()
			newpath = os.path.splitext(path)[0] + '.pdf'
			self._pdfs.put(newpath)

	def toSwf(self):
		while True:
			path = self._pdfs.get()
			if not path:
				break
			# do something
			path_target = os.path.splitext(path)[0]
			if not os.path.exists(path_target):
				os.makedirs(path_target)
			subprocess.Popen(['pdf2swf', path, '-o', path_target + '/page%.swf', '-f', '-T', '9', '-t', '-s', 'storeallcharacters']).communicate()
			print(path)

	def close(self):
		self._tasks.put(None)
		self._dispatchThread.join()

		for i in range(self._toPdfThreadNum):
			self._docs.put(None)
		for i in range(self._toPdfThreadNum):
			self._toPdfThreads[i].join()
		# '转换为pdf'线程都结束之后,再结束'转换为swf'的线程
		for i in range(self._toSwfThreadNum):
			self._pdfs.put(None)
		for i in range(self._toSwfThreadNum):
			self._toSwfThreads[i].join()

if __name__ == '__main__':
	converter = Converter()
	converter.addTask('docs/paper.docx')

	time.sleep(3)

	converter.close()

下面是使用thrift建立的service

#!/usr/bin/env python
# encoding: utf-8

import sys, time

import convert

from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

sys.path.append('thrift/gen-py')

from ConvertApi import *
from ConvertApi.ttypes import *

class ServerHandle(ConvertApi.Iface):
	def __init__(self):
		self.converter = convert.Converter()

	def addTask(self, path):
		self.converter.addTask(path)
		return True

def main(port = 1990):
	processor = ConvertApi.Processor(ServerHandle())
	transport = TSocket.TServerSocket(port = port)
	tfactory = TTransport.TBufferedTransportFactory()
	pfactory = TBinaryProtocol.TBinaryProtocolFactory()

	server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
	print('Starting server on :%d' % port)
	server.serve()

if __name__ == '__main__':
	port = 110110
	main(port)

thrift文件如下(该接口提供的比较简单):

service ConvertApi {
	bool addTask(1: string path);
}

标签: none

已有 4 条评论

  1. 一般的c库都是提供一堆函数的,根本没有python这样从type级别定义thread-local的机制,c库根本就不知道你的调用是从哪个线程过来的,你要几个线程共用一个context c库有什么办法?硬要用thread-local变量的话似乎是可以做到,但是我很怀疑真有这么做的库,为了一个虚幻的“你怎么用都没有问题”来付出性能的代价,取决于系统,对于thread-local变量的每一次存取都是一次函数调用,甚至可能是系统调用。比如libmemcached“从一开始就强调自己是线程安全的”,其实和libmemcache一模一样,它的文档写的清清楚楚 Without creating your own loickng structures you can not share a single memcached_st. 这个memcached_st和libmemcache的mc/mctxt没什么区别。

    1. 这跟文章有什么关系,说的我云里雾里的

  2. 沙发!

添加新评论