* sqlachmeyをmultiprocessingで使う [#a18d3387]
マルチコア環境で、バッチ処理を早くするため、sqlaclhemyでmultiprocessingを行いたい。

* 実験環境 [#p1501c53]
+ ubuntu9.04
+ python2.6.2
+ sqlalchemy 0.55
* ポイント [#h687cbd8]
+ sqlalcemyは、Sessionではなく、scoped_sessionを使う
+ multiprocessingでは、Lockを使う。
* 良く分からない。 [#be3042aa]
+ sqlalchemyのsession,scoped_sessionの区別が良く分からない。

* ソースコード [#ocdcd3c9]

 #!/usr/bin/env python
 # -*- coding:utf-8 -*-
 from multiprocessing import *
 from sqlalchemy import *
 from sqlalchemy.orm import *
 import time
 ##engineとmetadata作る
 def mkMeta(url,echo=False):
     engine=create_engine(url,echo=echo)
     return MetaData(bind=engine),engine
 #table_test テーブルmapping用の、TableTestオブジェクト
 class TableTest(object):
     def __init__(self,name):
         self.name=name
 #テーブルテスト定義,実際には、nameしか使ってない。
 def table_def(meta):
     table_test=Table('table_test',meta,
                      Column('id', Integer, primary_key=True),
                      Column('name', String(40), nullable=False),
                      Column('created_at', DateTime, nullable=False, default=func.now()),
                      Column('updated_at', DateTime, nullable=False, default=func.now(), onupdate=func.now()),
                      )
     #マッピングします。
     mapper(TableTest,table_test)
     if not table_test.exists():
         table_test.create()
 
 def mksession(engine):
     Session=sessionmaker(bind=engine)
     #Sessionを作ったあと、scoped_sessionを作る。
     return scoped_session(Session)
     #return Session
 
 db_uri='mysql://root@localhost/ss_test'
 def createData(sess,lock,pid,offset=0,limit=0):
     '''
 Processで実行される関数
 offsetとlimit,session,lockを受け取る。
 sess,lockは事前に作ってある。 
 最初に検索して、無かったらレコードを追加する処理を書いた。
     '''
     t1=time.time()
     print 'start pid=%d sess=%s' % (pid,str(sess))
     for i in range(offset,limit):
         nm='pid=%d,i=%d' % (pid,i)
         lock.acquire()
         ql=sess.query(TableTest).filter(TableTest.name==nm)
         c=ql.count()
         lock.release()
         if c==0:
             t=TableTest(nm)
             lock.acquire()
             sess.add(t)
             lock.release()
 
     t1=time.time()-t1
     print 'end pid=%d time=%d ms' % (pid,t1*1000)
 def main():
     t1=time.time()
     m,e=mkMeta(db_uri,echo=False)
     table_def(m)
     sz=3000
     Session=mksession(e)
     sess=Session()
     lock=Lock()
     pl=[]
     #プロセスを生成、事前に生成した、scopped_sessionとlockを引き渡す。
     for i in range(10):
         p=Process(target=createData,args=(sess,lock,i,i*sz,(i+1)*sz))
         p.start()
         pl.append(p)
     #生成したプロセスが終わるのを待ちます。
     for p in pl:
         print 'pre join',p,p.is_alive()
         p.join()
         print 'post join',p,p.is_alive()
     #いろいろと終わったので、処理します。
     print 'sess start'
     sess.flush()
     print 'done sess flush'
     sess.commit()
     print 'done sess commit'
     sess.close()
     print 'done sess close'
     t1=time.time()-t1
     print 'time=%d ms'  % (t1*1000)
 if __name__=='__main__':main()

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS