Subscribed unsubscribe Subscribe Subscribe

GAE/PyでDatastoreのデータを全削除するためのbulkdeleter.pyを書いた

GAE/Pyでbulkloaderを使ってデータをアップロードする - すぎゃーんメモ
GAE/Pyでbulkloaderを使ってデータをダウンロード/削除する - すぎゃーんメモ
の続き。

そもそもDatastoreからfetchしてきた時点で(この処理はマルチスレッドで行われているっぽい)削除処理をかけてやれればいいのだけど、残念ながらこちらでカスタマイズできるのはExporterクラスについてだけのようで、残念ながらこいつはすべてのデータをダウンロードした後に生成されたgeneratorしか扱うことができない。

という問題があって、bulkloaderを使用して指定したkindのエンティティを効率よく全削除することは出来なかった。
そうなるとbulkloaderを使わずに自分でThread作ったりしてそれぞれでremote_apiを叩いていくようにする、ということしか思いつかなかったのだけど、せっかくbulkloaderが効率よくremote_apiからエンティティを引っ張ってくることができるのだから、それを利用しない手はない、と思い軽くhackしてみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import getopt
import sys
sys.path.extend([
        '/usr/local/google_appengine/',
        '/usr/local/google_appengine/lib/antlr3/',
        ])
from google.appengine.ext import db
from google.appengine.tools import appcfg
from google.appengine.tools import bulkloader
from google.appengine.tools.bulkloader import (
    BulkExporterThread,
    CheckOutputFile,
    Exporter)


# CheckOutputFile 無効化
CheckOutputFile.func_code = (lambda x: x).func_code

# Exporterは不要なのでDummyのインスタンスを返すようにする
def DummyExporter(kind):
    class dummy:
        def initialize(self, filename, exporter_opts):
            pass
        def finalize(self):
            pass
        def output_entities(self, entity_generator):
            pass
    return dummy()
Exporter.RegisteredExporter.func_code = DummyExporter.func_code

# 削除処理(BulkExporterThreadのTransferItemメソッドを横取りする)
def delete(self, item):
    retval = self.request_manager.GetEntities(item)
    # 取得してきたentitiesを全削除
    db.delete(retval.keys)
    retval.entities = []
    retval.keys = []
    return retval

BulkExporterThread.TransferItem.im_func.func_code = delete.func_code


if __name__ == '__main__':
    # 引数, オプションを追加
    sys.argv.insert(1, 'download_data')
    sys.argv.insert(2, '--filename=')
    sys.argv.insert(3, '--config_file=' + sys.argv[0])
    # 引数で指定したkind名でModelクラスを定義
    opts, unused_args =  getopt.getopt(sys.argv[2:], None, bulkloader.FLAG_SPEC)
    kind = [x[1] for x in opts if x[0] == '--kind'][0]
    exec "class %s(db.Model): pass" % kind

    appcfg.main(sys.argv)

これを、対象とするkind名とアプリケーションのディレクトリを指定して実行するだけ。

$ ./bulkdeleter.py --kind=HogeFugaPiyo ../application/

bulkloaderで使う他のオプションを指定することもできる、はず。

$ ./bulkdeleter.py --kind=HogeFugaPiyo --num_threads=50 --batch_size=50 ../application/


基本的に"appcfg.py download_data"と同じ操作をさせ、その中でDatastoreからfetchしてきたentitiesをExporterで書き出す代わりに全削除しているだけ。
変更箇所を最低限に抑えるためにピンポイントでメソッドの実装を入れ替えている。
関数の呼び出し時の処理を入れ替える - すぎゃーんメモ


これを使うことで、アップロードとする場合とほぼ同じ速度で全削除ができる。
ただし、マルチスレッドな並列処理を行うためには対象となるkindの__key__インデックスが作成されている必要がある。
これがなくても全削除はできるが、並列ではなく順次処理でしか削除できない。
従って、並列処理をさせるためには、削除を行う前にindex.yamlで対象となるkindに対して以下のようにindexを指定して、インデックスを作成しておく。これはExporterを使うダウンロード処理でも同じことが言える。

- kind: HogeFugaPiyo
  properties:
  - name: __key__
    direction: desc