It’s easy to back up Datastore entities to Cloud Storage with datastore admin tools, the result dataset are level db formatted binary files, which you can restore using the same tool pretty easily. However, if you want to analyze the exported data beyond only restoring it, there is an easy way to read the binary format:
from google.appengine.api.files import records from google.appengine.datastore import entity_pb from google.appengine.api import datastore filename = "/file/path" entities = [] raw = open(filename, 'r') reader = records.RecordsReader(raw) for record in reader: entity_proto = entity_pb.EntityProto(contents=record) entity = datastore.Entity.FromPb(entity_proto) entities.append(entity)
The resulted objects are dictionary-like entity types, which you can operate on directly.
Google’s appengine-MapReduce is a very handy and useful tool if you want to move large amount of data out of Cloud Datastore(a No-Sql store based on BigTable). However, the library only supports output writers to Google Cloud Storage(a S3-like service), which means you have to find other ways if you want to direct your data out of Google’s cloud platform, say, a relational database. Luckily, the library allows you to construct custom output writers to write your data to theoretically anywhere you specify, as long as you implement the OutputWriter interface.
Here is a custom output writer I used to direct data to a Postgres database:
class PostgresOutputWriter(OutputWriter): """A customized output writer for MapReduce.""" def __init__(self, host=None, port=None, database=None, user=None, password=None): # pylint: disable=W0231 self.host = host self.port = port self.database = database self.user = user self.password = password @classmethod def create(cls, mapreduce_state, shard_state): mapper_spec = mapreduce_state.mapreduce_spec.mapper params = _get_params(mapper_spec) return cls(host=params.get('host'), port=params.get('port'), database=params.get('database'), user=params.get('user'), password=params.get('password')) def write(self, data, ctx): pg_pool = ctx.get_pool('postgres_pool') if not pg_pool: pg_pool = _PostgresPool(ctx=ctx, host=self.host, port=self.port, database=self.database, user=self.user, password=self.password) ctx.register_pool('postgres_pool', pg_pool) pg_pool.append(data) def to_json(self): return { "host": self.host, "port": self.port, "database": self.database, "user": self.user, "password": self.password } @classmethod def from_json(cls, state): return cls(host=state.get('host'), port=state.get('port'), database=state.get('database'), user=state.get('user'), password=state.get('password')) @classmethod def validate(cls, mapper_spec): required_params = ["host", "port", "database", "user", "password"] if mapper_spec.output_writer_class() != cls: raise errors.BadWriterParamsError("Output writer class mismatch") params = _get_params(mapper_spec) if not all([arg in params for arg in required_params]): raise errors.BadWriterParamsError("Output writer requires parameters [{}]".format(', '.join(required_params))) if not isinstance(params.get("port"), int): raise errors.BadWriterParamsError("Parameter 'port' must be integer.") @classmethod def init_job(cls, mapreduce_state): pass def finalize(self, ctx, shard_state): pass @classmethod def finalize_job(cls, mapreduce_state): pass @classmethod def get_filenames(cls, mapreduce_state): return [] class _PostgresPool(object): """A mutation pool that accumulate writes of PostgresOutputWriter.""" PG_POOL_SIZE = 200 def __init__(self, ctx=None, host=None, port=None, database=None, user=None, password=None): self._queries = [] self._size = 0 self._ctx = ctx self._conn = pg8000.connect(host=host, port=port, database=database, user=user, password=password, ssl=True) def append(self, query): self._queries.append(query) self._size += 1 if self._size > self.PG_POOL_SIZE: self.flush() def flush(self): if self._queries: cur = self._conn.cursor() for query in self._queries: cur.execute(query) cur.close() self._conn.commit() self._queries = [] self._size = 0 def __enter__(self): return self def __exit__(self, atype, value, traceback): self.flush() self._conn.close()
Note that the code above requires ‘pg8000’, a pure-Python driver for Postgres —a driver module with C extensions(e.g. ‘psycopg2’) won’t work in AppEngine.
Update:
This has been merged to the mapreduce repo: https://github.com/GoogleCloudPlatform/appengine-mapreduce/pull/51
NOTE: This is an optional final project for Data Science course on Udacity.com. The project invloves processing raw data on NYC subway’s ridership information. Raw data sample can be found at: https://www.dropbox.com/s/meyki2wl9xfa7yk/turnstile_data_master_with_weather.csv
The following pieces of code illustrated a map-reduce approach to analyze the data and find the maximum hourly entries at each station, and corresponding hour of a day. Raw data(turnstile_data_master_with_weather.csv) is in format:
UNIT | DATEn | TIMEn | Hour | DESCn | ENTRIESn_hourly | EXITSn_hourly | maxpressurei | maxdewpti | mindewpti | minpressurei | meandewpti | meanpressurei | fog | rain | meanwindspdi | mintempi | meantempi | maxtempi | precipi | thunder | |
0 | R001 | 2011-05-01 | 01:00:00 | 1 | REGULAR | 0 | 0 | 30.31 | 42 | 35 | 30.23 | 39 | 30.27 | 0 | 0 | 5 | 50 | 60 | 69 | 0 | 0 |
1 | R001 | 2011-05-01 | 05:00:00 | 5 | REGULAR | 217 | 553 | 30.31 | 42 | 35 | 30.23 | 39 | 30.27 | 0 | 0 | 5 | 50 | 60 | 69 | 0 | 0 |
2 | R001 | 2011-05-01 | 09:00:00 | 9 | REGULAR | 890 | 1262 | 30.31 | 42 | 35 | 30.23 | 39 | 30.27 | 0 | 0 | 5 | 50 | 60 | 69 | 0 | 0 |
3 | R001 | 2011-05-01 | 13:00:00 | 13 | REGULAR | 2451 | 3708 | 30.31 | 42 | 35 | 30.23 | 39 | 30.27 | 0 | 0 | 5 | 50 | 60 | 69 | 0 | 0 |
…
Here is a mapper to read the source file and emit triples including station code, hourly entries number, and the corresponding hour, delimited by tabs(\t).
import sys import string import logging def mapper(): for line in sys.stdin: data = line.split(",") if len(data)!=22 or data[1]=='UNIT': continue x= "{0}\t{1}\t{2}".format(data[1], data[6],data[4]) print x mapper()
And a reducer to reduce the results into final stats.
import sys import logging def reducer(): max_entries = 0 old_key = None hour_in_day = '' for line in sys.stdin: data = line.strip().split("\t") if len(data) != 3: continue this_key = data[0] entry = float(data[1]) hour = data[2] if old_key and old_key != this_key: print "{0}\t{1}\t{2}".format(old_key, hour_in_day, max_entries) max_entries = 0 hour_in_day = '' old_key = this_key if max_entries == 0 or max_entries <= entry: max_entries = entry hour_in_day = hour if old_key != None: print "{0}\t{1}\t{2}".format(old_key, hour_in_day, max_entries) reducer()
Now we have a map-reduce model in place. We can run it on a Hadoop cluster with
$ hadoop jar /path/to/hadoop-xxx-streaming.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input myinput -output myoutputdir
, or simply test it locally using command below:
$ cat turnstile_data_master_with_weather.csv | python mapper.py | sort | python reducer.py
The output should be something like this:
R001 | 17 | 31213 |
R002 | 21 | 4295 |
R003 | 12 | 995 |
R004 | 12 | 2318 |
R005 | 12 | 2705 |
R006 | 12 | 2784 |
R007 | 12 | 1763 |
R008 | 12 | 1724 |
R009 | 12 | 1230 |
…
Once we have the final results, we can turn it into visualized charts easily with ggplot or matplotlib. We can also modify the mapper and reducer to fit for a different purpose.
由于想用更新版的InnoDB,我把公司的MySQL从5.0升级到了5.5, 升级过程在WHM下完成。 一切看起来都顺利,但结束后网页却挂了,不断显示Internal Server Error. 我猜可能PHP跟mysql的链接出了毛病。
查 apache错入日志: tail /etc/httpd/logs/error_log, 找到非常关键的几行字:
“error while loading shared libraries: libmysqlclient.so.15”
果然是包缺失。 还好网上有解决方案,下载了缺损包 重新加载就好了。
$wget -O /usr/lib64/libmysqlclient.so.15 http://files.directadmin.com/services/es_5.0_64/libmysqlclient.so.15
$chmod 755 /usr/lib64/libmysqlclient.so.15
$ldconfig
Steps:
1. Create a cgi directory
sudo mkdir /var/www/cgi-bin
2. Put all python scripts in the directory and make them executable
sudo chmod +x /var/www/cgi-bin/*.py
3. Tells Apache which directories it can execute in and how to handle a .py file
sudo vi /etc/apache2/sites-available/000-default
Edit the /cgi-bin/ part to be:
ScriptAlias /cgi-bin/ /var/www/cgi-bin/
<Directory "/var/www/cgi-bin">
AllowOverride None
Options +ExecCGI -MultiViews +SymLinksIfOwnerMatch
Order allow,deny
Allow from all
AddHandler cgi-script .py
AddHandler default-handler .html .htm
</Directory>
4. Restart Apache
Looks like there isn’t a direct way to change a variable in main method through a running thread. We could use class objects for this situation.
public class Outer { private static class Records{ public static int x; } public static void main(String[] args) throws Exception{ Records.x= 1; Thread t1 = new Thread() { @Override public void run() { Records.x ++; } }; t1.start(); t1.join(); //waiting for thread to finish //output .... } }
Sample cron file:
# minute, hour, dom, month, dow # minutely */1 * * * * php /app/site/cron/job1.php */1 15-23,0-5 * * * php /app/site/cron/job2.php */2 * * * * php /app/site/cron/job3.php */5 * * * * php /app/site/cron/job4.php # hourly */10 * * * * php /app/site/cron/job5.php 0 */2 * * * php /app/site/cron/job6.php 10 * * * * php /app/site/cron/job7.php # daily 10 0 * * * php /app/site/cron/job8.php 25 0 * * * php /app/site/cron/job9.php 30 0 * * * php /app/site/cron/job10.php 45 10 */1 * * php /app/site/cron/job11.php 1 20 * * * php /app/site/cron/job12.php #15 18 * * * php /app/site/cron/job13.php # weekly/monthly 0 0 11,25 * * php /app/site/cron/job14.php # daily emails 40 23 * * * php /app/site/cron/job15.php 20 0 * * * php /app/site/cron/job16.php 15 2 * * 1-6 php -d "memory_limit=600M" /app/site/cron/job17.php 50 2 1-6,8-31 * * php /app/site/cron/job18.php 0 3 * * * php /app/site/cron/job19.php # weekly emails 48 0 * * 2 php /app/site/cron/job20.php 50 1 1-6,8-31 * 7 php /app/site/cron/job21.php # monthly emails 0 0 2 * * php /app/site/cron/job22.php
Parser in PHP:
<?php function fit($str, $num){ if(strpos($str, ',')){ $arr= explode(',', $str); foreach ($arr as $element) { if ( fit($element, $num)) return true; } return false; } if(strpos($str, '-')){//if exist '-' list($low, $high)=split('-',$str); if($num=(int)$low ) { return true; } else { return false; } } if(strpos($str, '/')){ list($pre, $pos)=split('/',$str); if($pre=='*'){ if ($num % (int)$pos ==0) return true; else return false; } else{ if ($num % (int)$pos == (int)$pre) return true; else return false; } } //base case if((int)$str==$num) return true; return false; } function next_run_time($line){ $time=time(); //list($minute, $hour, $day, $month, $dow) = split(" ", $line); list($minute, $hour, $day, $month, $dow) = preg_split('/ +/', $line); //var_dump(array($minute, $hour, $day, $month, $dow)); if($dow=='0') $dow=7; do{ list( $now_minute, $now_hour, $now_day, $now_month, $now_dow ) = split( " ", date("i H d n N", $time ) ); if($month!='*'){ if( !fit($month, $now_month) ){ $month = (int)$now_month + 1; $time = mktime( 0, 0, 0, $now_month, 1, date("Y",$time) ); continue; } } if($day !='*'){ if( !fit($day, $now_day) ){ $now_day = (int)$now_day + 1; $time = mktime(0, 0, 0, $now_month, $now_day, date("Y",$time) ); continue; } } if( $hour !='*'){ if( !fit($hour, $now_hour) ){ $now_hour = (int)$now_hour + 1; $time = mktime( $now_hour, 0, 0, $now_month, $now_day, date("Y",$time) ); continue; } } if( $minute !='*'){ if( !fit($minute, $now_minute) ){ $now_minute = (int)$now_minute + 1; $time = mktime( $now_hour, $now_minute, 0, $now_month, $now_day, date("Y",$time) ); continue; } } if( $dow != '*'){ if( !fit($dow, $now_dow) ){ $now_day = (int)$now_day + 1; $time = mktime( 0, 0, 0, $now_month, $now_day, date("Y",$time) ); continue; } } break; } while(true); return $time; } $filearray= file("/home/xiaolong/cron_sample"); foreach ($filearray as $line_num=>$line){ if (substr(trim($line),0,1) =='#' ) { //commented line, do nothing } elseif (trim($line)==""){ //empty line, do nothing } else { //print $line; $t=next_run_time(trim($line)); print "\nThe next runtime for job [".$line."] is ".date( "m/d/Y H:i T", $t )."\n"; } }
When I fetched a document from MongoDB, I can use
print $doc['_id']
to print out a string-like id. This could be very misleading since I could easily consider using a string type id to query a document.
This can’t be done. If you use
$cursor= $collection->find(array("_id" => $id)); //$id is string type input
you will get nothing. Otherwise, you have to use:
$cursor= $collection->find(array("_id" => new MongoID($id))); //you have to construct the Object first.
Since $doc[‘_id’] is actually an MongoID object. MongoDB does not use a simple string to represent the universal identifier of an document(although it basically is), it uses a MongoID object to do that.
Following code fragment is what I used to get access_token with Instagram.
$token_url="https://api.instagram.com/oauth/access_token"; $fields= array('client_id'=>$app_id, 'client_secret'=>$app_secret, 'grant_type'=>'authorization_code', 'redirect_uri'=>urlencode($my_url), 'code'=>$code, ); $fields_string=''; foreach($fields as $key=>$value) { $fields_string .= $key.'='.$value.'&'; } rtrim($fields_string,'&'); $ch = curl_init(); //set the url, number of POST vars, POST data curl_setopt($ch,CURLOPT_URL,$token_url); curl_setopt($ch,CURLOPT_POST,count($fields)); curl_setopt($ch,CURLOPT_POSTFIELDS,$fields_string); curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);// //execute post $result = curl_exec($ch); $info=curl_getinfo($ch); curl_close($ch);
The line “curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);” is necessarily for the returned string to be json format, otherwise the returned result will be status code(True/False).
$ sudo a2enmod rewrite
restart Apache.
If not working, try:
$ sudo gedit /etc/apache2/sites-available/default
change the AllowOverride value to “All”, and restart Apache.