GAE/Pyでbulkloaderを使ってデータをアップロードする - すぎゃーんメモ
GAE/Pyでbulkloaderを使ってデータをダウンロード/削除する - すぎゃーんメモ
の続き。
そもそもDatastoreからfetchしてきた時点で(この処理はマルチスレッドで行われているっぽい)削除処理をかけてやれればいいのだけど、残念ながらこちらでカスタマイズできるのはExporterクラスについてだけのようで、残念ながらこいつはすべてのデータをダウンロードした後に生成されたgeneratorしか扱うことができない。
という問題があって、bulkloaderを使用して指定したkindのエンティティを効率よく全削除することは出来なかった。
そうなるとbulkloaderを使わずに自分でThread作ったりしてそれぞれでremote_apiを叩いていくようにする、ということしか思いつかなかったのだけど、せっかくbulkloaderが効率よくremote_apiからエンティティを引っ張ってくることができるのだから、それを利用しない手はない、と思い軽くhackしてみた。
#!/usr/bin/env python # -*- coding: utf-8 -*- import getopt import sys sys.path.extend([ '/usr/local/google_appengine/', '/usr/local/google_appengine/lib/antlr3/', ]) from google.appengine.ext import db from google.appengine.tools import appcfg from google.appengine.tools import bulkloader from google.appengine.tools.bulkloader import ( BulkExporterThread, CheckOutputFile, Exporter) # CheckOutputFile 無効化 CheckOutputFile.func_code = (lambda x: x).func_code # Exporterは不要なのでDummyのインスタンスを返すようにする def DummyExporter(kind): class dummy: def initialize(self, filename, exporter_opts): pass def finalize(self): pass def output_entities(self, entity_generator): pass return dummy() Exporter.RegisteredExporter.func_code = DummyExporter.func_code # 削除処理(BulkExporterThreadのTransferItemメソッドを横取りする) def delete(self, item): retval = self.request_manager.GetEntities(item) # 取得してきたentitiesを全削除 db.delete(retval.keys) retval.entities = [] retval.keys = [] return retval BulkExporterThread.TransferItem.im_func.func_code = delete.func_code if __name__ == '__main__': # 引数, オプションを追加 sys.argv.insert(1, 'download_data') sys.argv.insert(2, '--filename=') sys.argv.insert(3, '--config_file=' + sys.argv[0]) # 引数で指定したkind名でModelクラスを定義 opts, unused_args = getopt.getopt(sys.argv[2:], None, bulkloader.FLAG_SPEC) kind = [x[1] for x in opts if x[0] == '--kind'][0] exec "class %s(db.Model): pass" % kind appcfg.main(sys.argv)
これを、対象とするkind名とアプリケーションのディレクトリを指定して実行するだけ。
$ ./bulkdeleter.py --kind=HogeFugaPiyo ../application/
bulkloaderで使う他のオプションを指定することもできる、はず。
$ ./bulkdeleter.py --kind=HogeFugaPiyo --num_threads=50 --batch_size=50 ../application/
基本的に"appcfg.py download_data"と同じ操作をさせ、その中でDatastoreからfetchしてきたentitiesをExporterで書き出す代わりに全削除しているだけ。
変更箇所を最低限に抑えるためにピンポイントでメソッドの実装を入れ替えている。
関数の呼び出し時の処理を入れ替える - すぎゃーんメモ
これを使うことで、アップロードとする場合とほぼ同じ速度で全削除ができる。
ただし、マルチスレッドな並列処理を行うためには対象となるkindの__key__インデックスが作成されている必要がある。
これがなくても全削除はできるが、並列ではなく順次処理でしか削除できない。
従って、並列処理をさせるためには、削除を行う前にindex.yamlで対象となるkindに対して以下のようにindexを指定して、インデックスを作成しておく。これはExporterを使うダウンロード処理でも同じことが言える。
- kind: HogeFugaPiyo properties: - name: __key__ direction: desc