Skip to content

Reading AppEngine Datastore backups

It’s easy to back up Datastore entities to Cloud Storage with datastore admin tools, the result dataset are level db formatted binary files, which you can restore using the same tool pretty easily.  However, if you want to analyze the exported data beyond only restoring it, there is an easy way to read the binary format:


from google.appengine.api.files import records
from google.appengine.datastore import entity_pb
from google.appengine.api import datastore

filename = "/file/path"
entities = []
raw = open(filename, 'r')
reader = records.RecordsReader(raw)

for record in reader:
    entity_proto = entity_pb.EntityProto(contents=record)
    entity = datastore.Entity.FromPb(entity_proto)
    entities.append(entity)

The resulted objects are dictionary-like entity types, which you can operate on directly.

Writing a custom output writer for App Engine MapReduce

Google’s appengine-MapReduce is a very handy and useful tool if you want to move large amount of data out of Cloud Datastore(a No-Sql store based on BigTable). However, the library only supports output writers to Google Cloud Storage(a S3-like service), which means you have to find other ways if you want to direct your data out of Google’s cloud platform, say, a relational database. Luckily, the library allows you to construct custom output writers to write your data to theoretically anywhere you specify, as long as you implement the OutputWriter interface.

Here is a custom output writer I used to direct data to a Postgres database:

class PostgresOutputWriter(OutputWriter):
  """A customized output writer for MapReduce."""

  def __init__(self, host=None, port=None, database=None, user=None, password=None): # pylint: disable=W0231
    self.host = host
    self.port = port
    self.database = database
    self.user = user
    self.password = password

  @classmethod
  def create(cls, mapreduce_state, shard_state):
    mapper_spec = mapreduce_state.mapreduce_spec.mapper
    params = _get_params(mapper_spec)
    return cls(host=params.get('host'),
               port=params.get('port'),
               database=params.get('database'),
               user=params.get('user'),
               password=params.get('password'))

  def write(self, data, ctx):
    pg_pool = ctx.get_pool('postgres_pool')
    if not pg_pool:
      pg_pool = _PostgresPool(ctx=ctx,
                              host=self.host,
                              port=self.port,
                              database=self.database,
                              user=self.user,
                              password=self.password)
      ctx.register_pool('postgres_pool', pg_pool)
    pg_pool.append(data)

  def to_json(self):
    return {
      "host": self.host,
      "port": self.port,
      "database": self.database,
      "user": self.user,
      "password": self.password
    }

  @classmethod
  def from_json(cls, state):
    return cls(host=state.get('host'),
               port=state.get('port'),
               database=state.get('database'),
               user=state.get('user'),
               password=state.get('password'))

  @classmethod
  def validate(cls, mapper_spec):
    required_params = ["host", "port", "database", "user", "password"]
    if mapper_spec.output_writer_class() != cls:
      raise errors.BadWriterParamsError("Output writer class mismatch")

    params = _get_params(mapper_spec)
    if not all([arg in params for arg in required_params]):
      raise errors.BadWriterParamsError("Output writer requires parameters [{}]".format(', '.join(required_params)))

    if not isinstance(params.get("port"), int):
      raise errors.BadWriterParamsError("Parameter 'port' must be integer.")

  @classmethod
  def init_job(cls, mapreduce_state):
    pass

  def finalize(self, ctx, shard_state):
    pass

  @classmethod
  def finalize_job(cls, mapreduce_state):
    pass

  @classmethod
  def get_filenames(cls, mapreduce_state):
    return []


class _PostgresPool(object):
  """A mutation pool that accumulate writes of PostgresOutputWriter."""

  PG_POOL_SIZE = 200

  def __init__(self, ctx=None, host=None, port=None, database=None, user=None, password=None):
    self._queries = []
    self._size = 0
    self._ctx = ctx
    self._conn = pg8000.connect(host=host, port=port, database=database,
                                user=user, password=password, ssl=True)

  def append(self, query):
    self._queries.append(query)
    self._size += 1
    if self._size > self.PG_POOL_SIZE:
      self.flush()

  def flush(self):
    if self._queries:
      cur = self._conn.cursor()
      for query in self._queries:
        cur.execute(query)
      cur.close()
      self._conn.commit()
    self._queries = []
    self._size = 0

  def __enter__(self):
    return self

  def __exit__(self, atype, value, traceback):
    self.flush()
    self._conn.close()

Note that the code above requires ‘pg8000’, a pure-Python driver for Postgres —a driver module with C extensions(e.g. ‘psycopg2’) won’t work in AppEngine.

Update:

This has been merged to the mapreduce repo: https://github.com/GoogleCloudPlatform/appengine-mapreduce/pull/51

Analysis on NYC subway ridership data with a Map-Reduce approach

NOTE: This is an optional final project for Data Science course on Udacity.com.  The project invloves processing raw data on NYC subway’s ridership information. Raw data sample can be found at: https://www.dropbox.com/s/meyki2wl9xfa7yk/turnstile_data_master_with_weather.csv

 

The following pieces of code illustrated a map-reduce approach to analyze the data and find the maximum hourly entries at each station, and corresponding hour of a day.  Raw data(turnstile_data_master_with_weather.csv) is in format:

UNIT DATEn TIMEn Hour DESCn ENTRIESn_hourly EXITSn_hourly maxpressurei maxdewpti mindewpti minpressurei meandewpti meanpressurei fog rain meanwindspdi mintempi meantempi maxtempi precipi thunder
0 R001 2011-05-01 01:00:00 1 REGULAR 0 0 30.31 42 35 30.23 39 30.27 0 0 5 50 60 69 0 0
1 R001 2011-05-01 05:00:00 5 REGULAR 217 553 30.31 42 35 30.23 39 30.27 0 0 5 50 60 69 0 0
2 R001 2011-05-01 09:00:00 9 REGULAR 890 1262 30.31 42 35 30.23 39 30.27 0 0 5 50 60 69 0 0
3 R001 2011-05-01 13:00:00 13 REGULAR 2451 3708 30.31 42 35 30.23 39 30.27 0 0 5 50 60 69 0 0

Here is a mapper to read the source file and emit triples including station code, hourly entries number, and the corresponding hour, delimited by tabs(\t).

import sys
import string
import logging


def mapper():


    for line in sys.stdin:
        data = line.split(",")
        if len(data)!=22 or data[1]=='UNIT':
            continue
        x=  "{0}\t{1}\t{2}".format(data[1], data[6],data[4])
        print x

mapper()

And a reducer to reduce the results into final stats.

import sys
import logging


def reducer():
    max_entries = 0
    old_key = None
    hour_in_day = ''

    for line in sys.stdin:
        data = line.strip().split("\t")
        if len(data) != 3:
            continue
        this_key = data[0]
        entry = float(data[1])
        hour = data[2]


        if old_key and old_key != this_key:        
            print "{0}\t{1}\t{2}".format(old_key, hour_in_day, max_entries)
            max_entries = 0
            hour_in_day = ''
            
        old_key = this_key
        if max_entries == 0 or max_entries <= entry:
            max_entries = entry
            hour_in_day = hour
        
    if old_key != None:
        print "{0}\t{1}\t{2}".format(old_key, hour_in_day, max_entries)

reducer()

Now we have a map-reduce model in place. We can run it on a Hadoop cluster with

$ hadoop jar /path/to/hadoop-xxx-streaming.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input myinput -output myoutputdir

, or simply test it locally using command below:

$ cat turnstile_data_master_with_weather.csv | python mapper.py | sort | python reducer.py

 

The output should be something like this:

R001 17 31213
R002 21 4295
R003 12 995
R004 12 2318
R005 12 2705
R006 12 2784
R007 12 1763
R008 12 1724
R009 12 1230

Once we have the final results, we can turn it into visualized charts easily with ggplot or matplotlib. We can also modify the mapper and reducer to fit for a different purpose.

MySQL升级问题

由于想用更新版的InnoDB,我把公司的MySQL从5.0升级到了5.5, 升级过程在WHM下完成。 一切看起来都顺利,但结束后网页却挂了,不断显示Internal Server Error. 我猜可能PHP跟mysql的链接出了毛病。

查 apache错入日志: tail /etc/httpd/logs/error_log, 找到非常关键的几行字:

error while loading shared libraries: libmysqlclient.so.15

果然是包缺失。 还好网上有解决方案,下载了缺损包 重新加载就好了。

$wget -O /usr/lib64/libmysqlclient.so.15 http://files.directadmin.com/services/es_5.0_64/libmysqlclient.so.15
$chmod 755 /usr/lib64/libmysqlclient.so.15
$ldconfig

enabling python cgi on ubuntu/apache 2

Steps:
1. Create a cgi directory
sudo mkdir /var/www/cgi-bin

2. Put all python scripts in the directory and make them executable
sudo chmod +x /var/www/cgi-bin/*.py

3. Tells Apache which directories it can execute in and how to handle a .py file

sudo vi /etc/apache2/sites-available/000-default
Edit the /cgi-bin/ part to be:

ScriptAlias /cgi-bin/ /var/www/cgi-bin/
<Directory "/var/www/cgi-bin">
AllowOverride None
Options +ExecCGI -MultiViews +SymLinksIfOwnerMatch
Order allow,deny
Allow from all
AddHandler cgi-script .py
AddHandler default-handler .html .htm
</Directory>

4. Restart Apache

updating a global variable in Thread

Looks like there isn’t a direct way to change a variable in main method through a running thread. We could use class objects for this situation.

public class Outer {
	
	private static class Records{
		public static int x;
	}
	
	public static void main(String[] args) throws Exception{
                Records.x= 1;
		Thread t1 = new Thread() {
			@Override
			public void run() {
				Records.x ++; 
			}
		};
		t1.start();
		t1.join(); //waiting for thread to finish

                //output ....
	}
}

[php] cron parser to determine next runtime

Sample cron file:

# minute, hour, dom, month, dow

# minutely
*/1 *   *     * * php /app/site/cron/job1.php
*/1 15-23,0-5    *     * * php /app/site/cron/job2.php
*/2 *   *     * * php /app/site/cron/job3.php
*/5 *   *     * * php /app/site/cron/job4.php

# hourly
*/10 *   *     * * php /app/site/cron/job5.php
0    */2 *     * * php /app/site/cron/job6.php
10   *   *     * * php /app/site/cron/job7.php

# daily
10  0   *     * * php /app/site/cron/job8.php
25  0   *     * * php /app/site/cron/job9.php
30  0   *     * * php /app/site/cron/job10.php
45  10  */1   * * php /app/site/cron/job11.php
1   20  *     * * php /app/site/cron/job12.php
#15  18  *     * * php /app/site/cron/job13.php

# weekly/monthly
0   0   11,25 * * php /app/site/cron/job14.php

# daily emails
40 23 *        * *   php /app/site/cron/job15.php
20 0 *        * *   php /app/site/cron/job16.php
15 2 *        * 1-6   php -d "memory_limit=600M" /app/site/cron/job17.php
50 2 1-6,8-31 * *   php /app/site/cron/job18.php
0  3 *        * *   php /app/site/cron/job19.php

# weekly emails
48 0 *        * 2   php /app/site/cron/job20.php
50 1 1-6,8-31 * 7   php /app/site/cron/job21.php

# monthly emails
0  0 2        * *   php /app/site/cron/job22.php

Parser in PHP:

<?php

function fit($str, $num){
	if(strpos($str, ',')){
		$arr= explode(',', $str);
		foreach ($arr as $element) {
			if ( fit($element, $num)) return true;
		}
		return false;
	}
	
	if(strpos($str, '-')){//if exist '-'
		list($low, $high)=split('-',$str);
		if($num=(int)$low ) {
			return true;
		} else { return false; }
	}
	
	if(strpos($str, '/')){
		list($pre, $pos)=split('/',$str);
		if($pre=='*'){
			if ($num % (int)$pos ==0) return true;
			else return false;
		}
		else{
			if ($num % (int)$pos == (int)$pre) return true;
			else return false;
		}
	}
	
	//base case
	if((int)$str==$num) return true;	
	return false;
}



function next_run_time($line){
	$time=time();
	//list($minute, $hour, $day, $month, $dow) = split(" ", $line);
	list($minute, $hour, $day, $month, $dow) = preg_split('/ +/', $line);
	//var_dump(array($minute, $hour, $day, $month, $dow));
	if($dow=='0') $dow=7;
	
	do{
		list( $now_minute, $now_hour, $now_day, $now_month, $now_dow ) = split( " ", date("i H d n N", $time ) );
		
		if($month!='*'){
			if( !fit($month, $now_month) ){
				$month = (int)$now_month + 1;
				$time = mktime( 0, 0, 0, $now_month, 1, date("Y",$time) );
				continue;
			}
		}
			
		if($day !='*'){
			if( !fit($day, $now_day) ){
				$now_day = (int)$now_day + 1;
				$time = mktime(0, 0, 0, $now_month, $now_day, date("Y",$time) );
				continue;
			}
		}
		
		if( $hour !='*'){
			if( !fit($hour, $now_hour) ){
				$now_hour = (int)$now_hour + 1;
				$time = mktime( $now_hour, 0, 0, $now_month, $now_day, date("Y",$time) );
				continue;
			}
		}
		
		if( $minute !='*'){
			if( !fit($minute, $now_minute) ){
				$now_minute = (int)$now_minute + 1; 
				$time = mktime( $now_hour, $now_minute, 0, $now_month, $now_day, date("Y",$time) );
				continue;
			}
		}
		
		if( $dow != '*'){
			if( !fit($dow, $now_dow) ){
				$now_day = (int)$now_day + 1;
				$time = mktime( 0, 0, 0, $now_month, $now_day, date("Y",$time) );
				continue;
			}
		}
		
		break;
		
	} while(true);
	
	return $time;
}



$filearray= file("/home/xiaolong/cron_sample");
foreach ($filearray as $line_num=>$line){
	if (substr(trim($line),0,1) =='#' ) {
		//commented line, do nothing 
	} 
	elseif (trim($line)==""){
		//empty line, do nothing
	}
	else {
		//print $line;
		$t=next_run_time(trim($line));
		print "\nThe next runtime for job [".$line."] is ".date( "m/d/Y H:i T", $t )."\n";
	}
	
}

String and MongoDB ObjectID casting in PHP

When I fetched a document from MongoDB, I can use

print $doc['_id'] 

to print out a string-like id. This could be very misleading since I could easily consider using a string type id to query a document.
This can’t be done. If you use

$cursor= $collection->find(array("_id" => $id)); //$id is string type input

you will get nothing. Otherwise, you have to use:

$cursor= $collection->find(array("_id" => new MongoID($id))); //you have to construct the Object first.

Since $doc[‘_id’] is actually an MongoID object. MongoDB does not use a simple string to represent the universal identifier of an document(although it basically is), it uses a MongoID object to do that.

an example of using cURL to post

Following code fragment is what I used to get access_token with Instagram.

$token_url="https://api.instagram.com/oauth/access_token";

$fields= array('client_id'=>$app_id,
		'client_secret'=>$app_secret,
		'grant_type'=>'authorization_code',
		'redirect_uri'=>urlencode($my_url),
		'code'=>$code,
		);
$fields_string='';
foreach($fields as $key=>$value) 
	{ $fields_string .= $key.'='.$value.'&'; }
rtrim($fields_string,'&');

$ch = curl_init();

//set the url, number of POST vars, POST data
curl_setopt($ch,CURLOPT_URL,$token_url);
curl_setopt($ch,CURLOPT_POST,count($fields));
curl_setopt($ch,CURLOPT_POSTFIELDS,$fields_string);
curl_setopt($ch,  CURLOPT_RETURNTRANSFER, 1);//

//execute post
$result = curl_exec($ch);
$info=curl_getinfo($ch);
curl_close($ch);

 

The line “curl_setopt($ch,  CURLOPT_RETURNTRANSFER, 1);” is necessarily for the returned string to be json format, otherwise the returned result will be status code(True/False).

enabling apache mod rewrite on ubuntu

$ sudo a2enmod rewrite
restart Apache.

If not working, try:

$ sudo gedit /etc/apache2/sites-available/default
change the AllowOverride value to “All”, and restart Apache.