* sqlachmeyをmultiprocessingで使う [#a18d3387]
マルチコア環境で、バッチ処理を早くするため、sqlaclhemyでmultiprocessingを行いたい。
* 実験環境 [#p1501c53]
+ ubuntu9.04
+ python2.6.2
+ sqlalchemy 0.55
* ポイント [#h687cbd8]
+ sqlalcemyは、Sessionではなく、scoped_sessionを使う
+ multiprocessingでは、Lockを使う。
* 良く分からない。 [#be3042aa]
+ sqlalchemyのsession,scoped_sessionの区別が良く分からない。
* ソースコード [#ocdcd3c9]
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import *
from sqlalchemy import *
from sqlalchemy.orm import *
import time
##engineとmetadata作る
def mkMeta(url,echo=False):
engine=create_engine(url,echo=echo)
return MetaData(bind=engine),engine
#table_test テーブルmapping用の、TableTestオブジェクト
class TableTest(object):
def __init__(self,name):
self.name=name
#テーブルテスト定義,実際には、nameしか使ってない。
def table_def(meta):
table_test=Table('table_test',meta,
Column('id', Integer, primary_key=True),
Column('name', String(40), nullable=False),
Column('created_at', DateTime, nullable=False, default=func.now()),
Column('updated_at', DateTime, nullable=False, default=func.now(), onupdate=func.now()),
)
#マッピングします。
mapper(TableTest,table_test)
if not table_test.exists():
table_test.create()
def mksession(engine):
Session=sessionmaker(bind=engine)
#Sessionを作ったあと、scoped_sessionを作る。
return scoped_session(Session)
#return Session
db_uri='mysql://root@localhost/ss_test'
def createData(sess,lock,pid,offset=0,limit=0):
'''
Processで実行される関数
offsetとlimit,session,lockを受け取る。
sess,lockは事前に作ってある。
最初に検索して、無かったらレコードを追加する処理を書いた。
'''
t1=time.time()
print 'start pid=%d sess=%s' % (pid,str(sess))
for i in range(offset,limit):
nm='pid=%d,i=%d' % (pid,i)
lock.acquire()
ql=sess.query(TableTest).filter(TableTest.name==nm)
c=ql.count()
lock.release()
if c==0:
t=TableTest(nm)
lock.acquire()
sess.add(t)
lock.release()
t1=time.time()-t1
print 'end pid=%d time=%d ms' % (pid,t1*1000)
def main():
t1=time.time()
m,e=mkMeta(db_uri,echo=False)
table_def(m)
sz=3000
Session=mksession(e)
sess=Session()
lock=Lock()
pl=[]
#プロセスを生成、事前に生成した、scopped_sessionとlockを引き渡す。
for i in range(10):
p=Process(target=createData,args=(sess,lock,i,i*sz,(i+1)*sz))
p.start()
pl.append(p)
#生成したプロセスが終わるのを待ちます。
for p in pl:
print 'pre join',p,p.is_alive()
p.join()
print 'post join',p,p.is_alive()
#いろいろと終わったので、処理します。
print 'sess start'
sess.flush()
print 'done sess flush'
sess.commit()
print 'done sess commit'
sess.close()
print 'done sess close'
t1=time.time()-t1
print 'time=%d ms' % (t1*1000)
if __name__=='__main__':main()