2012年11月

使用python Queue创建文档格式转换服务

该项目比较简单,只具备文件排队转换,不具有查看队列的功能,使用openoffice和unconv作为转换程序,程序不提供建立系统服务

使用场景,一般使用thrift建立服务,将该程序挂载在里面。

#!/usr/bin/env python
#encoding: utf-8
#author: xiaozi <245565986@qq.com>
import Queue, threading, os, subprocess
import time

class Converter(object):
	def __init__(self):
		self._tasks = Queue.Queue()
		self._docs = Queue.Queue()
		self._pdfs = Queue.Queue()

		self._toPdfThreads = []
		self._toSwfThreads = []
		self._toPdfThreadNum = 4
		self._toSwfThreadNum = 4

		self._dispatchThread = threading.Thread(target=self.dispatchTask)
		self._dispatchThread.setDaemon(True)

		for i in range(self._toPdfThreadNum):
			pdfThread = threading.Thread(target=self.toPdf)
			pdfThread.setDaemon(True)
			self._toPdfThreads.append(pdfThread)

		for i in range(self._toSwfThreadNum):
			swfThread = threading.Thread(target=self.toSwf)
			swfThread.setDaemon(True)
			self._toSwfThreads.append(swfThread)
		
		self._dispatchThread.start()
		for i in range(self._toPdfThreadNum):
			self._toPdfThreads[i].start()
		for i in range(self._toSwfThreadNum):
			self._toSwfThreads[i].start()

	def addTask(self, path):
                print(path)
		self._tasks.put(path)

	def dispatchTask(self):
		while True:
			path = self._tasks.get()
			if not path:
				break
			ext = os.path.splitext(path)[1][1:].lower()
			if ext == 'doc' or ext == 'docx':
				self._docs.put(path)
			elif ext == 'pdf':
				self._pdfs.put(path)

	def toPdf(self):
		while True:
			path = self._docs.get()
			if not path:
				break
			subprocess.Popen(
				['python', 'unoconv', '-f', 'pdf', path]
			).communicate()
			newpath = os.path.splitext(path)[0] + '.pdf'
			self._pdfs.put(newpath)

	def toSwf(self):
		while True:
			path = self._pdfs.get()
			if not path:
				break
			# do something
			path_target = os.path.splitext(path)[0]
			if not os.path.exists(path_target):
				os.makedirs(path_target)
			subprocess.Popen(['pdf2swf', path, '-o', path_target + '/page%.swf', '-f', '-T', '9', '-t', '-s', 'storeallcharacters']).communicate()
			print(path)

	def close(self):
		self._tasks.put(None)
		self._dispatchThread.join()

		for i in range(self._toPdfThreadNum):
			self._docs.put(None)
		for i in range(self._toPdfThreadNum):
			self._toPdfThreads[i].join()
		# '转换为pdf'线程都结束之后,再结束'转换为swf'的线程
		for i in range(self._toSwfThreadNum):
			self._pdfs.put(None)
		for i in range(self._toSwfThreadNum):
			self._toSwfThreads[i].join()

if __name__ == '__main__':
	converter = Converter()
	converter.addTask('docs/paper.docx')

	time.sleep(3)

	converter.close()

下面是使用thrift建立的service

#!/usr/bin/env python
# encoding: utf-8

import sys, time

import convert

from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

sys.path.append('thrift/gen-py')

from ConvertApi import *
from ConvertApi.ttypes import *

class ServerHandle(ConvertApi.Iface):
	def __init__(self):
		self.converter = convert.Converter()

	def addTask(self, path):
		self.converter.addTask(path)
		return True

def main(port = 1990):
	processor = ConvertApi.Processor(ServerHandle())
	transport = TSocket.TServerSocket(port = port)
	tfactory = TTransport.TBufferedTransportFactory()
	pfactory = TBinaryProtocol.TBinaryProtocolFactory()

	server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
	print('Starting server on :%d' % port)
	server.serve()

if __name__ == '__main__':
	port = 110110
	main(port)

thrift文件如下(该接口提供的比较简单):

service ConvertApi {
	bool addTask(1: string path);
}