GAE/Pyでbulkloaderを使ってデータをアップロードする

bulkloaderとは

  • remote_apiを使ってコンソールからDatastoreにデータをアップロードするためのツール。
  • 初期データとして大量のデータをDatastoreに突っ込んでおきたいときなどに便利。
  • 逆にDatastoreにある内容をダウンロードすることもできる。削除をするようには作られていない?
  • 今のところPython版のみ。Java版もそのうち出る?

http://code.google.com/intl/ja/appengine/docs/python/tools/uploadingdata.html

簡単な使用例

例えばアプリケーション内で、model.pyというファイルでこんなクラスを作るとする。

from google.appengine.ext import db

class PersonalData(db.Model):
    name = db.StringProperty()
    mail = db.EmailProperty()
    age  = db.IntegerProperty()


app.yamlではremote_apiの設定をする。

application: bulkloader-test
version: 1
runtime: python
api_version: 1

handlers:
- url: /remote_api
  script: $PYTHON_LIB/google/appengine/ext/remote_api/handler.py
  login: admin

- url: .*
  script: main.py


ここまではアプリケーションでの設定。bulkloaderはアプリケーションとはまた別なので別ディレクトリで作って良い。

.
|-- application
|   |-- app.yaml
|   |-- index.yaml
|   |-- main.py
|   |-- model.py
|   `-- template.html
`-- bulkloader
    |-- data.csv
    |-- loader.py
    `-- model.py -> ../application/model.py

こんなカンジで分けてみた。


data.csvがアップロードするためのデータ。なんちゃって個人情報CSVを作成してみた。

有田碧海,aritaouga@example.com,48
松本秀樹,matsumotohideki@example.com,30
谷本恵梨香,tanimotoerika@example.com,50
玉木誠治,tamakiseiji@example.com,20
飯島栄一,iijimaeiichi@example.com,70
...

ヘッダ行は邪魔なので削除。


loader.pyでは上記のデータのロード方法を定義することになる。ほぼドキュメント通りに書くとこんなカンジ。

from google.appengine.tools import bulkloader
from model import PersonalData


class DataLoader(bulkloader.Loader):
    def __init__(self):
        bulkloader.Loader.__init__(self, 'PersonalData', [
                ('name', lambda x: x.decode('s-jis')),
                ('mail', str),
                ('age',  int),
                ])

loaders = [DataLoader]

bulkloader.Loaderクラスを継承したクラスを作成して、loadersにリストで突っ込む。
ここでも同じmodel定義を使用するので、model.pyはシンボリックリンクで参照する。
タプルのリストではそれぞれのカラムのデータ変換方法を定義することになる。「なんちゃって個人情報」で吐き出されたCSV文字コードがShift-jisになっているようなので、それに合わせて変換。


ここまで出来れば、あとはappcfg.pyを使ってアップロードする。bulkloaderディレクトリで、

$ PYTHONPATH=. appcfg.py upload_data --config_file=loader.py --filename=data.csv --kind=PersonalData --url=http://localhost:8080/remote_api ../application/

というカンジでコマンドを打てばデータのアップロードが行われる。
上記の例は開発サーバに対して実行しているのでemailとpasswordを聞かれるけど何を入力しても成功する。
本番環境に対して実行する場合は--url指定は不要で、デプロイするときと同様のemailとpasswordで成功する。


DataViewerで確認してみる。

keyのnameを指定して重複したアップロードを避ける

このbulkloaderによるアップロードは、デフォルトではガンガン新しいエンティティを作成していくので、何度も実行すると同じデータが重複して登録されてしまう。
それを避けるために、keyのnameを指定する方法がある。bulkloader.Loaderクラスにはgenerate_keyというメソッドがあり、これをオーバーライドすることでkeyを指定できるらしい。

  def generate_key(self, i, values):
    """Generates a key_name to be used in creating the underlying object.

    The default implementation returns None.

    This method can be overridden to control the key generation for
    uploaded entities. The value returned should be None (to use a
    server generated numeric key), or a string which neither starts
    with a digit nor has the form __*__ (see
    http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html),
    or a db.Key instance.

    If you generate your own string keys, keep in mind:

    1. The key name for each entity must be unique.
    2. If an entity of the same kind and key already exists in the
       datastore, it will be overwritten.

    Args:
      i: Number corresponding to this object (assume it's run in a loop,
        this is your current count.
      values: list/tuple of str.

    Returns:
      A string to be used as the key_name for an entity.
    """
    return None


ので、例えば先程の例のloader.pyを以下のように書き換えてみる。

from google.appengine.tools import bulkloader
from model import PersonalData


class DataLoader(bulkloader.Loader):
    def __init__(self):
        bulkloader.Loader.__init__(self, 'PersonalData', [
                ('name', lambda x: x.decode('s-jis')),
                ('mail', str),
                ('age',  int),
                ])
        
    def generate_key(self, i, values):
        return values[1].split('@')[0]

    
loaders = [DataLoader]

generate_keyメソッドには何行目のデータであるかを表す i と、それぞれの行でのparseした後の文字列リスト values が渡される。
ここではemailがユニークなものとして、"@"の前の部分をとりだしてkey_nameに使用するよう指定した。


こうすると、同じデータをもう一度アップロードしても各行のデータは前回と同じキーになるため、同じデータで上書きされるだけになり、エンティティの数は変化しない。

自動的に割り振られたIDではなく、Key Nameが使われていることが確認できる

key_nameを元データに含めて使用する

元のCSVを変更して、格納するデータには含めないがkey_nameとして使用したい、というデータ列を追加してみる。

a,有田碧海,aritaouga@example.com,48
b,松本秀樹,matsumotohideki@example.com,30
c,谷本恵梨香,tanimotoerika@example.com,50
d,玉木誠治,tamakiseiji@example.com,20
e,飯島栄一,iijimaeiichi@example.com,70
...

こうするとcolumn数が4になり、アップロードしようとしても

AssertionError: Expected 3 columns, found 4.

となって失敗する。
が、この処理はgenerate_keyが呼び出されたあとに実行されるようなので、左端のcolumnをkey_nameに使用したあと削除してしまえば問題なくアップロードされる。

from google.appengine.tools import bulkloader
from model import PersonalData


class DataLoader(bulkloader.Loader):
    def __init__(self):
        bulkloader.Loader.__init__(self, 'PersonalData', [
                ('name', lambda x: x.decode('s-jis')),
                ('mail', str),
                ('age',  int),
                ])
        
    def generate_key(self, i, values):
        return values.pop(0)

    
loaders = [DataLoader]

generate_key内でvaluesの0番目をpopしている。こうすると結局valuesの要素数は3になるので残ったデータがアップロードに使用されることになる。

CSV以外の形式のデータをアップロードする

デフォルトでは、--filenameに指定してparseできるのはCSV形式のファイルとなっている。
これは、bulkloader.Loaderクラスのgenerate_recordsメソッドがそうしているから。

  def generate_records(self, filename):
    """Subclasses can override this to add custom data input code.

    This method must yield fixed-length lists of strings.

    The default implementation uses csv.reader to read CSV rows
    from filename.

    Args:
      filename: The string input for the --filename option.

    Yields:
      Lists of strings.
    """
    csv_generator = CSVGenerator(filename, openfile=self.__openfile,
                                 create_csv_reader=self.__create_csv_reader
                                ).Records()
    return csv_generator

従って、このメソッドをオーバーライドすることでCSV以外の形式のファイルに対してもアップロードが実行できる。

例:TwitterAPIから取得したJSONデータを使用してアップロードする

例えば以下のようなモデル定義をして、TwitterのPublicTimelineのデータをアップロードしたい、というとき。

from google.appengine.ext import db


class TimeLine(db.Model):
    name = db.StringProperty()
    text = db.TextProperty()
    time = db.DateTimeProperty()


TwitterのPublicTimelineはAPIから簡単に取得できる。が、CSV形式では取得できない。
でもわざわざアップロードするために他の形式で取得したものをCSVに変換するのも面倒。
generate_recodeをオーバーライドしてJSONファイルをそのまま使ってアップロードできるようにする。


generate_recordsメソッドは文字列の配列を返すgeneratorを返してやれば良いらしい。

import datetime
from django.utils import simplejson
from google.appengine.tools import bulkloader
from model import TimeLine


class DataLoader(bulkloader.Loader):
    def __init__(self):
        bulkloader.Loader.__init__(self, 'TimeLine', [
                ('name', str),
                ('text', lambda x: x.decode('utf-8')),
                ('time', lambda x: datetime.datetime.strptime(x, '%a %b %d %H:%M:%S +0000 %Y')),
                ])

    def generate_records(self, filename):
        file = open(filename)
        def generator():
            for status in simplejson.loads(file.read()):
                yield [
                    status['user']['screen_name'],
                    status['text'].encode('utf-8'),
                    status['created_at'],
                    ]
        return generator()
    
loaders = [DataLoader]

generatorとかyieldとかあんまりよく分かっていないので書き方おかしいかも知れないけど…
django.utilsのsimplejsonを使ってjsonファイルの内容をparseし、得られたリストからscreen_name, text, created_atを取り出してそれらのリストを返すgeneratorを定義し、それを返すようにgenerate_recodeをオーバーライドした。


こうすることで、APIから取得したデータをそのまま使って自分のアプリケーションにアップロードすることができる。

$ wget http://twitter.com/statuses/public_timeline.json 
$ PYTHONPATH=. appcfg.py upload_data --config_file=loader.py --filename=public_timeline.json --kind=TimeLine --url=http://localhost:8080/remote_api ../application


もちろん、重複アップロードを避けるために前述の方法を使ってIDを指定させることもできる。

import datetime
from django.utils import simplejson
from google.appengine.tools import bulkloader
from model import TimeLine


class DataLoader(bulkloader.Loader):
    def __init__(self):
        bulkloader.Loader.__init__(self, 'TimeLine', [
                ('name', str),
                ('text', lambda x: x.decode('utf-8')),
                ('time', lambda x: datetime.datetime.strptime(x, '%a %b %d %H:%M:%S +0000 %Y')),
                ])
        
    def generate_key(self, i, values):
        return 'id:' + str(values.pop(0))

    def generate_records(self, filename):
        file = open(filename)
        def generator():
            for timeline in simplejson.loads(file.read()):
                yield [
                    timeline['id'],
                    timeline['user']['screen_name'],
                    timeline['text'].encode('utf-8'),
                    timeline['created_at'],
                    ]
        return generator()
    
loaders = [DataLoader]

generatorが返す配列の0番目にtimelineのid要素を挿入。generate_keyメソッドをオーバーライドして、その値をpopして使用する。
ただしTwitterのtimelineのidは数字列なので、これをそのまま使用することはできない。generate_keyではkeyのid値を指定することはできず、nameの文字列を指定することしかできない。しかもその場合、文字列は数字で始まってはいけないので先頭に何らかの文字を繋げる必要がある。

例:TwitterAPIから取得したXMLデータを使用してアップロードする

JSONで出来るのだから、XMLだってparseさえ出来れば同じように使える。

import datetime
from xml.etree import ElementTree
from google.appengine.tools import bulkloader
from model import TimeLine


class DataLoader(bulkloader.Loader):
    def __init__(self):
        bulkloader.Loader.__init__(self, 'TimeLine', [
                ('name', str),
                ('text', lambda x: x.decode('utf-8')),
                ('time', lambda x: datetime.datetime.strptime(x, '%a %b %d %H:%M:%S +0000 %Y')),
                ])
        
    def generate_key(self, i, values):
        return 'id:' + str(values.pop(0))

    def generate_records(self, filename):
        file = open(filename)
        def generator():
            tree = ElementTree.parse(file)
            for status in tree.findall('status'):
                yield [
                    status.findtext('id'),
                    status.find('user').findtext('screen_name'),
                    status.findtext('text').encode('utf-8'),
                    status.findtext('created_at'),
                    ]
        return generator()
    
loaders = [DataLoader]

xml.etreeというものを使ってみた。これで、JSONと同じようにアップロードができるはず。

$ wget http://twitter.com/statuses/public_timeline.xml 
$ appcfg.py upload_data --config_file=loader.py --filename=public_timeline.xml --kind=TimeLine --url=http://localhost:8080/remote_api ../application
むしろファイルから読み込まなくてもアップロードできる

要するに必要なデータを返すgeneratorさえ返せば良いので、bulkloaderの実行時に初めてデータを取りに行ってそれを使う、ということも出来てしまう。

import datetime
import urllib
from xml.etree import ElementTree
from google.appengine.tools import bulkloader
from model import TimeLine


class DataLoader(bulkloader.Loader):
    def __init__(self):
        bulkloader.Loader.__init__(self, 'TimeLine', [
                ('name', str),
                ('text', lambda x: x.decode('utf-8')),
                ('time', lambda x: datetime.datetime.strptime(x, '%a %b %d %H:%M:%S +0000 %Y')),
                ])
        
    def generate_key(self, i, values):
        return 'id:' + str(values.pop(0))

    def generate_records(self, filename):
        url = urllib.urlopen('http://twitter.com/statuses/public_timeline.xml')
        def generator():
            tree = ElementTree.parse(url)
            for status in tree.findall('status'):
                yield [
                    status.findtext('id'),
                    status.find('user').findtext('screen_name'),
                    status.findtext('text').encode('utf-8'),
                    status.findtext('created_at'),
                    ]
        return generator()
    
loaders = [DataLoader]

generate_recordsに渡された引数filenameなんて無視して、その場でurllibからデータを取りに行ってそれを使っている。
こうすることで、アップロードするデータファイルを用意しなくてもアップロードが可能。
ただし、"appcfg.py upload_data"コマンドではオプションに"--filename=***"で存在するファイル名を指定しないと怒られてしまう。"."とかでお茶を濁しておけば問題なくアップロードできる。

$ PYTHONPATH=. appcfg.py upload_data --config_file=loader.py --filename=. --kind=TimeLine ../application

おしまい

まとめると、

  • bulkloaderは便利
  • Loaderクラスのメソッドオーバーライドで色々カスタマイズ可能

bulkloader.Loaderにはもう一つ、handle_entityというこれまたオーバーライドできるメソッドがあって、さらなるカスタマイズをしたいときなんかに使えるかも知れない。