GAE/Pyでbulkloaderを使ってデータをダウンロード/削除する

前回は、bulkloaderによるアップロードについて色々調べて試した。
GAE/Pyでbulkloaderを使ってデータをアップロードする - すぎゃーんメモ


今度は、データのダウンロードと削除について調べる。

データをダウンロードする

アップロードとほぼ同様に行うことができる。remote_apiへのハンドラ設定などは前回と同じとする。
使用するのはbulkloader.Loaderクラスではなくbulkloader.Exporterクラス。


例えば前回使ったTimeLineというモデルでデータがDatastoreに格納されているとする。

# model.py
from google.appengine.ext import db


class TimeLine(db.Model):
    name = db.StringProperty()
    text = db.TextProperty()
    time = db.DateTimeProperty()


ダウンロード用に用意するスクリプトは以下のようになる。

# loader.py
from google.appengine.tools import bulkloader
from model import TimeLine


class TimeLineExporter(bulkloader.Exporter):
    def __init__(self):
        bulkloader.Exporter.__init__(self, 'TimeLine', [
                ('name', str,                         None),
                ('text', lambda x: x.encode('utf-8'), None),
                ('time', str,                         None),
                ])

exporters = [TimeLineExporter]

やはりLoaderクラスのときと同様に__init__()メソッド内でデータの変換方法を指定することになる。
リストの各タプルの3番目の要素はデフォルト値を指定するらしい。


これを使って、"appcfg.py download_data"コマンドを実行するだけで、データをダウンロードしてファイルに書き出してくれる。

$ PYTHONPATH=. appcfg.py download_data --config_file=loader.py --filename=./result.csv --kind=TimeLine <対象アプリケーションのディレクトリ>

"--filename"で指定するファイル名は絶対パスになるのかな?単にファイル名だけを指定してもFileNotWritableErrorで失敗するので絶対パスとかカレントディレクトリからの相対パスとかにしないとダメっぽい。既に存在するファイル名もNG。


これで、指定したkindのデータをすべてCSVに書き出すことができる。

CSV以外の形式でダウンロードする

アップロードと同様に、メソッドのオーバーライドをすることでCSV以外の形式でファイル保存することもできる。
bulkloader.Exporterクラスで、オーバーライドすべきメソッドは、output_entities。

  def output_entities(self, entity_generator):
    """Outputs the downloaded entities.

    This implementation writes CSV.

    Args:
      entity_generator: A generator that yields the downloaded entities
        in key order.
    """
    CheckOutputFile(self.output_filename)
    output_file = open(self.output_filename, 'w')
    logger.debug('Export complete, writing to file')
    output_file.writelines(self.__SerializeEntity(entity) + '\n'
                           for entity in entity_generator)

引数で渡されるのはentityを取り出すことの出来るgenerator、ファイル名は前処理でself.outpt_filenameに格納されている。
ここで実際にCSVのレコードを作っているのは__SerializeEntityメソッド、さらにその中で呼んでいる__EncodeEntityメソッドなのだけど、これらはprivateなメソッドなのでオーバーライドできない(まぁ実際には"_Exporter__EncodeEntity"というようなメソッド名にすればオーバーライドできちゃうけどw)。
作法としては出力のメインとなるこのoutput_entitiesメソッドを書き換えるだけにするのが良いと思われる。

例:XML形式で出力する

appengineのdb.Modelクラスには、to_xmlというインスタンスメソッドがある。これをそのまま使うと便利。

from google.appengine.tools import bulkloader
from model import TimeLine


class TimeLineExporter(bulkloader.Exporter):
    def __init__(self):
        bulkloader.Exporter.__init__(self, 'TimeLine', [
                ('name', str,                         None),
                ('text', lambda x: x.encode('utf-8'), None),
                ('time', str,                         None),
                ])
        
    def output_entities(self, entity_generator):
        file = open(self.output_filename, 'w')
        file.write('<entities>\n')
        for entity in entity_generator:
            file.write(entity.to_xml().encode('utf-8'))
        file.write('</entities>\n')
        
exporters = [TimeLineExporter]

こんなカンジに。

$ PYTHONPATH=. appcfg.py download_data --config_file=loader.py --filename=./result.xml --kind=TimeLine ../application
$ cat result.xml
<entities>
<entity kind="TimeLine" key="aghzdWdpMTk4MnIQCxIIVGltZUxpbmUY198BDA">
  <key>tag:sugi1982.gmail.com,2009-07-09:TimeLine[aghzdWdpMTk4MnIQCxIIVGltZUxpbmUY198BDA]</key>
  <property name="name" type="string">name40</property>
  <property name="text" type="text">テキスト40</property>
  <property name="time" type="gd:when">1900-01-01 10:42:26</property>
</entity>
<entity kind="TimeLine" key="aghzdWdpMTk4MnIQCxIIVGltZUxpbmUY2N8BDA">
  <key>tag:sugi1982.gmail.com,2009-07-09:TimeLine[aghzdWdpMTk4MnIQCxIIVGltZUxpbmUY2N8BDA]</key>
  <property name="name" type="string">name41</property>
  <property name="text" type="text">テキスト41</property>
  <property name="time" type="gd:when">1900-01-01 07:11:17</property>
</entity>
...
</entities>
例:JSON形式で出力する

各エンティティをJSONとしてシリアライズできるよう辞書などのデータに変換してやれば良い。

from google.appengine.tools import bulkloader
from model import TimeLine


class TimeLineExporter(bulkloader.Exporter):
    def __init__(self):
        bulkloader.Exporter.__init__(self, 'TimeLine', [
                ('name', str,                         None),
                ('text', lambda x: x.encode('utf-8'), None),
                ('time', str,                         None),
                ])
        
    def output_entities(self, entity_generator):
        from django.utils import simplejson
        file = open(self.output_filename, 'w')

        entities = []
        for entity in entity_generator:
            entities.append({
                    'name' : entity.name,
                    'text' : entity.text,
                    'time' : str(entity.time),
                    })
        file.write(simplejson.dumps(entities))
        
exporters = [TimeLineExporter]

こんなカンジに。

$ PYTHONPATH=. appcfg.py download_data --config_file=loader.py --filename=./result.json --kind=TimeLine ../application
$ cat result.json
[{"text": "\u30c6\u30ad\u30b9\u30c840", "name": "name40", "time": "1900-01-01 10:42:26"}, {...

データを全削除する

本来の使い方ではないかもしれないけど、このExporterクラスのoutput_entitiesメソッドですべてのエンティティを触ることができるので、ファイル出力に使わずに(もしくはファイル出力した後で)それらのエンティティを削除することもできる。

良くない例

すべてのエンティティをひとつずつ取り出して、一個ずつ削除する。

from google.appengine.tools import bulkloader
from model import TimeLine


class TimeLineExporter(bulkloader.Exporter):
    def __init__(self):
        bulkloader.Exporter.__init__(self, 'TimeLine', [
                ('name', str,                         None),
                ('text', lambda x: x.encode('utf-8'), None),
                ('time', str,                         None),
                ])
        
    def output_entities(self, entity_generator):
        for entity in entity_generator:
            entity.delete()
    
exporters = [TimeLineExporter]

確かにこれですべてのエンティティを削除できるが、毎回毎回DatastoreのAPIを叩くことになるのでおそらくDatastoreへの負荷が高い。
Dashboardで確認してみたところ、100件のデータを削除するために”Datastore API Calls”が123回ほど呼ばれ、時間もそれなりにかかっていた。

まとめて削除 その1

google.appengine.ext.dbパッケージではdeleteという関数が提供されていて、これはエンティティまたはキーのリストを渡すことで一気に削除することができる。

from google.appengine.tools import bulkloader
from model import TimeLine


class TimeLineExporter(bulkloader.Exporter):
    def __init__(self):
        bulkloader.Exporter.__init__(self, 'TimeLine', [
                ('name', str,                         None),
                ('text', lambda x: x.encode('utf-8'), None),
                ('time', str,                         None),
                ])
        
    def output_entities(self, entity_generator):
        from google.appengine.ext import db
        db.delete(list(entity_generator))
    
exporters = [TimeLineExporter]

こうすると、API Callの回数は上記と同じ100件のデータに対し24回となり、かかった時間は非常に短くなった。


しかし、この方法では問題が生じることもある。
あまり件数が多いと

google.appengine.api.datastore_errors.BadRequestError: cannot delete more than 500 entities in a single call

というエラーが出てしまう。

まとめて削除 その2

一気に全部を削除しようとはせず、数百個区切りで繰り返し一括削除を行う。

from google.appengine.tools import bulkloader
from model import TimeLine


class TimeLineExporter(bulkloader.Exporter):
    def __init__(self):
        bulkloader.Exporter.__init__(self, 'TimeLine', [
                ('name', str,                         None),
                ('text', lambda x: x.encode('utf-8'), None),
                ('time', str,                         None),
                ])
        
    def output_entities(self, entity_generator):
        from google.appengine.ext import db

        while True:
            entities = []
            for entity in entity_generator:
                entities.append(entity)
                if len(entities) >= 300:
                    break
                
            print len(entities)
            if len(entities) == 0:
                break
            db.delete(entities)
    
exporters = [TimeLineExporter]

300個程度までリストが膨らんだらそれを一気に削除するようにし、その処理をwhile文で回してみた。
こうすることで先ほどのエラーは避けられる。

課題

ただ、上記の処理も繰り返して続けていると負荷がかかりすぎてtimeoutになってしまったりするかもしれない。
データのアップロードやダウンロードと同様にThreadで処理を分けてある程度待ちながら行うようにするのが理想か…
そもそもDatastoreからfetchしてきた時点で(この処理はマルチスレッドで行われているっぽい)削除処理をかけてやれればいいのだけど、残念ながらこちらでカスタマイズできるのはExporterクラスについてだけのようで、残念ながらこいつはすべてのデータをダウンロードした後に生成されたgeneratorしか扱うことができない。
実際にQueryを投げてfetchを行っているのはRequestManagerクラスのGetEntitiesメソッドか。これがBulkExporterThreadに呼ばれている。このあたりをどうにかHackできればいいんだけど。。