sqlachmeyをmultiprocessingで使う †マルチコア環境で、バッチ処理を早くするため、sqlaclhemyでmultiprocessingを行いたい。 実験環境 †
ポイント †
良く分からない。 †
ソースコード †#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import * from sqlalchemy import * from sqlalchemy.orm import * import time ##engineとmetadata作る def mkMeta(url,echo=False): engine=create_engine(url,echo=echo) return MetaData(bind=engine),engine #table_test テーブルmapping用の、TableTestオブジェクト class TableTest(object): def __init__(self,name): self.name=name #テーブルテスト定義,実際には、nameしか使ってない。 def table_def(meta): table_test=Table('table_test',meta, Column('id', Integer, primary_key=True), Column('name', String(40), nullable=False), Column('created_at', DateTime, nullable=False, default=func.now()), Column('updated_at', DateTime, nullable=False, default=func.now(), onupdate=func.now()), ) #マッピングします。 mapper(TableTest,table_test) if not table_test.exists(): table_test.create() def mksession(engine): Session=sessionmaker(bind=engine) #Sessionを作ったあと、scoped_sessionを作る。 return scoped_session(Session) #return Session db_uri='mysql://root@localhost/ss_test' def createData(sess,lock,pid,offset=0,limit=0): ''' Processで実行される関数 offsetとlimit,session,lockを受け取る。 sess,lockは事前に作ってある。 最初に検索して、無かったらレコードを追加する処理を書いた。 ''' t1=time.time() print 'start pid=%d sess=%s' % (pid,str(sess)) for i in range(offset,limit): nm='pid=%d,i=%d' % (pid,i) lock.acquire() ql=sess.query(TableTest).filter(TableTest.name==nm) c=ql.count() lock.release() if c==0: t=TableTest(nm) lock.acquire() sess.add(t) lock.release() t1=time.time()-t1 print 'end pid=%d time=%d ms' % (pid,t1*1000) def main(): t1=time.time() m,e=mkMeta(db_uri,echo=False) table_def(m) sz=3000 Session=mksession(e) sess=Session() lock=Lock() pl=[] #プロセスを生成、事前に生成した、scopped_sessionとlockを引き渡す。 for i in range(10): p=Process(target=createData,args=(sess,lock,i,i*sz,(i+1)*sz)) p.start() pl.append(p) #生成したプロセスが終わるのを待ちます。 for p in pl: print 'pre join',p,p.is_alive() p.join() print 'post join',p,p.is_alive() #いろいろと終わったので、処理します。 print 'sess start' sess.flush() print 'done sess flush' sess.commit() print 'done sess commit' sess.close() print 'done sess close' t1=time.time()-t1 print 'time=%d ms' % (t1*1000) if __name__=='__main__':main() |