Scenario: a Jupyter Notebook trying to load a module from a project src for exploratory analysis. We don’t want to copy and paste the common module.

Jupyter Notebook is a different python process (ipykernel), so additional module loading is necessary.

Two approaches to load, and for dev dynamic loading of a previous loaded module is needed:

  1. sys-based
  2. importlib-based

assuming: src –> package (with –>,
and has from package.moduel1 import a, b, c

1. sys-based

import sys
import os
project_path = os.path.realpath('project_path')
src_path = os.path.join(project_path, 'src')
sys.path.insert(1, src_path)
import package.module
# or
from package import module

Notice: ‘src’ is inserted into sys path, ‘src’ itself will be unknown to import statement.
i.e. import src will be “No module named”

2. importlib-based

import importlib.util
module_spec = importlib.util.spec_from_file_location('package', str(package_path / ''))
module = importlib.util.module_from_spec(module_spec)
sys.modules[] = module
import package.module
# or
from package import module

Notice: the name ‘package’ must be identical to the package from package.moduel1 import a, b, c,
because sys.modules[] actually loads this module into system path, so the name must have a correct reference.

dynamic reloading a module during dev

# or

For this case, only import package.module2 works,
otherwise name ‘pacakage’ is not defined.

Reference to [doc]

If a module imports objects from another module using from … import …, calling reload() for the other module does not redefine the objects imported from it — one way around this is to re-execute the from statement, another is to use import and qualified names ( instead.

Python Module Loading


Get your spark job running on spark job server with Mesos and Docker

Spark Job Server (SJS) has a readme on how to run spark jobs, but some details are missing when you want to quickly deploy a spark job server as a docker, and able to submit a job to Mesos with Spark dockers. Writing spark job (sqlcontext in this blog) requires certain configurations as well.

Components include Spark Job Server (as docker, spark 1.6.1 for mesos 0.25.0), Spark Job (scala), Mesos (0.25.0) and Spark docker (1.6.1 with mesos 0.25.0).

0. Get Default Spark Job Server docker to running first

Docker is the most convenient way to start a SJS server. The only problem is that SJS docker is written as sbt-docker instead of conventional Dockerfile.  Later we will have to change the build.sbt to customize our docker, but before that let’s run the default one to test first.

Follow the docs docker,  we pulled the default docker image from dockerhub and SJS started automatically.

docker run -d -p 8090:8090 velvia/spark-jobserver:0.6.2.mesos-0.28.1.spark-1.6.1

Obtain the wordcount jar by git clone repo and make sure you have sbt installed. Remember to checkout branch with version 0.6.2, instead of master. Run packaging at project root

sbt job-server-tests/package

allocate the jar in generated target folder and follow the

upload the jar

curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests-$VER.jar localhost:8090/jars/test
curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'

Obtain the sql context jar for testing sql context. Follow doc/

sbt 'job-server-extras/package'
curl --data-binary @job-server-extras/job-server-extras/target/scala-2.10/job-server-extras_2.10-0.6.2-SNAPSHOT-tests.jar

create sql context
curl -d "" ''

curl -d "sql = \"select * from addresses limit 10\"" ''

If wired thing happen when building jars, do ‘sbt clean’

1. Build Customized SJS docker

If start SJS docker by specifying

-e SPARK_MASTER=mesos:zk://

and submit to your mesos cluster, it is probably not working out of the box, due to different mesos version or docker network problem. The symptom is from mesos cluster, you will see a framework keep trying to register with mesos but fail immediately. Check mesos log which will be only available for mesos leader not others.

Shutdown failed on fd=117: Transport endpoint is not connected [107]

From SJS server log, you will see

ERROR ocalContextSupervisorActor [] [] – Exception after sending Initialize to JobManagerActor
akka.pattern.AskTimeoutException: Ask timed out on

From, try to run SJS docker with HOST network if on AWS EC2. ‘–net=host’, and submit wordcount job again. Observe Mesos UI you will see your wordcount framework will be registered, even though it will still fail for other reasons.

Note, if problem still exists, it is likely due to some firewall issue or connection traps (e.g. Windows as your development machine).

Registered framework still fails because of following error.

Fetching URI ”
Failed to fetch ”: A relative path was passed for the resource but the Mesos framework home was not specified. Please either provide this config option or avoid using a relative path
Failed to synchronize with slave (it’s probably exited)

URI refers to SPARK_EXECUTOR_URI, and SJS by default tries to start server with option


So it is bug, since if I don’t have anything to specify, just remove this option.

To fix the bug, go to bin/, replace with following, and rebuild docker.

# For Mesos
if [ -n "$SPARK_EXECUTOR_URI" ]; then

To rebuild docker, at project root, issue ‘sbt docker’, if you see

unknown shorthand flag: ‘f’ in -f

be sure to update sbt-docker plugin to 1.4.0 for removing deprecated flag. It is because sbt-docker 1.3.0 is using -f flag which is deprecated by docker 1.12.

From here, you probably find we have not told mesos where is the spark distribution. So slave error will be

sh: 1: /usr/local/spark/bin/spark-class: not found

In fact, spark.mesos.executor.docker.image is required to pass as SJS argument.

We create a script for our customized docker run. The idea is from ISSUE

#!/usr/bin/env sh


docker run -d \
 -p 8090:8090 \
 --net host \
 app/ --conf spark.mesos.coarse=true --conf spark.mesos.executor.docker.image=${SPARK_IMAGE} --conf spark.mesos.executor.home=/opt/spark

Note:  It is possible that mesos in the docker is not compatible with your mesos cluster. To build a compatible mesos version with the SJS docker, we will have to customize the source code. Change mesos version in dependences.scala, to the compatible version which sbt-docker plugin can download from mesos repos, and then build the docker from.

2. Run examples to test on Mesos

Pass the word count jar to SJS and execute the job as before, and you will see it is beautifully executing.

Pass the sql context jar to SJS, and define the sqlcontext as before, and you will see it is running like a charm.

3. Write your own spark job running on Mesos

Suppose you want to use package like for avro, and access data from AWS S3. Case like this will require you to install external dependencies.

To begin, I recommend using maven to create a spark project with archetype.






        <name>Job Server Bintray</name>

You may need to exclude following to avoid conflicts with spark-core sub dependency.


In your code you need to set, for using s3a://. By default, they are not configured, and you probably encounter Null Pointer Exception with less helpful message. But I found this blog is useful. Notice that I omit access key and secrete key, since I am relying on IAM provided on my instances.

sqlContext.sparkContext.hadoopConfiguration.set(“fs.s3a.buffer.dir”, “/tmp”)
sqlContext.sparkContext.hadoopConfiguration.set(“fs.s3a.impl”, “org.apache.hadoop.fs.s3a.S3AFileSystem”)

Make you code running and pass unit test. Then package it as a jar, by mvn package


Since libraries for aws and avro are “provided”, simply add for SJS docker.

--packages com.databricks:spark-avro_2.10:2.0.1,org.apache.hadoop:hadoop-aws:2.7.1"

Yeah, that’s pretty much about it. And happy sparking.




Tagged , , ,

Tornado and Concurrency

Reading the Tornado documents did not grand me basic understanding of asynchronous web service calls. After referring back and forth external examples and blogs, I summarize my brief understanding of tornado’s concurrency model.

  1. Common multi-thread web service frameworks, where one user request is given one dedicated thread, controls the number of concurrent threads running on a web server. The thread is blocked once the request hits some blocking calls (e.g. database access or an external blocking http call). The thread will wait until a result is returned and the control is returned to the thread.
  2. Different from above, Tornado makes use of IOLoop, a single thread to handle all requests, though a server may have multiple IO Loops, we did not discuss this case here. The design of IOLoop makes the design of asynchronous calls clearer, i.e. every asynchronous calls is spawned or yielded  from this single IOLoop, and eventually return controls to this single IOLoop. IOLoop is created by Application which is a singleton to maintain global status.
  3. Tornado making asynchronous easier does not mean by default Tornado is asynchronous. By default, handlers are synchronous, and it means other user requests will be blocked severely since IOLoop has only one thread, IF the blocking factor is non-negligible, like database calls, aws calls, long-polling (websocket maybe). In order to make handler asynchronous for those blocking monsters, first we need to annotate handler method with @tornado.web.asynchronous using callbacks, or annotate with @tornado.gen.coroutine using yield. Second, you need to make your calls asynchronous or in a executor pool: the design consideration will follow recommendations like this,
    1.  Try make the calling component asynchronous by using asynchronous library (database asynchronous drivers, e.g.) or AsynchHttpClient for external calls.
    2. Make synchronous call faster by using faster local database etc.
    3. If remote synchronous (blocking) calls have to be used, then make use of ThreadPoolExecutor.

So what to do for blocking database drivers like redshift?  Yes, redshift does not have asynchronous query driver support. And, via AWS we can’t safely assume the synchronous connection is fast (DynamoDB maybe fine).

Then we need to make use of ThreadPoolExecutor.



Tagged ,

Avro Parquet Redshift and S3 with Spark

Here I summarize what I found through the journey, while I have time when my cluster is busy running.

What I have:

Raw Input: in Avro with many schemas mixed in s3 bucket (this is bad, later I will tell you why)

Target: store date in redshift for query (this is ok. well I can work on parquet directly, storing in a db-like place can save me a lot of trouble when exporting and ad-hoc queries are badly needed in no time!)

So choices of approaches:

  1. parse avro to parquet and make use of spark parquet package to write into a redshift.
  2. load avro directly to redshift via COPY command

Choice 2 is better than Choice 1, because parquet to redshift actually is converted to avro and written into s3. Choice 1 requires two rounds of network io.

Choice 1 required only once, but you need to create your table first (which is simple and should be done for better control over schema).

What made me crazy:

  1. mix different avro schemas into one S3 bucket is a bad bad idea: currently library lacks of loading different schemas in one call. They failed to identify the difference of schemas. Instead, spark parquet package will throw schema column is not found error. And COPY command will falsely match different schemas into a strange result.
  2. avro needs to be checked extensively. Corrupted data will make program cry. Redshift COPY’s maxerror has no effects over corrupted data. Two types of errors were seen: 1. Cannot init avro reader from s3 file Incorrect sync bytes. 2. Invalid AVRO file found. Unexpected end of AVRO file. The two errors corresponds to spark parquet packages 1. Invalid Sync! 2. gives you empty avro data.
  3. For COPY command, I use manifest in S3. My advice is to use different key for different manifest, don’t try to overwrite existing manifest. There are cases you did not overwrite but append! Once manifest appended, redshift will append the records as well as duplicates.




Walk Through of Mahout LDA 0.9 againt Tweets on CDH 5.2.+

For the time being, only key points are listed. Best Tutorial Ever Follow Tom’s book on develop MapReduce Programs in a regression way, unit test -> local file -> hdfs file -> multiple hdfs files Package dependencies with maven shade plugin Flume generates too many small files 1170+ files, on my single CDH machine MR takes longer time 1 hour+ to process  (about 23k tweets) MR produces one 2.2 MB sequence file with key tweetID and value text. Create vectors using mahout seq2sparse (stopwords can be filtered out by the maxDFPercentage and using TF, ?? unknown). Create rowid for mahout LDA to work (convert into matrix ?? unknown).Run mahout cvb. Beware that if you don’t want to hit hdfs, remember to set sth. to MAHOUT_LOCAL so that mahout will run only in local mode. Applying seqdumper to read intermediate sequence file and using vectordump to display LDA output (topic term is the one displaying terms for a topic, whereas topic document is for document and topic matching) Yarn Related:

  • unset HADOOP_CLASSPATH (make sure Yarn will see the exact path as you are using. Your local Hadoop Classpath will not be able to transfer to Yarn)
  • sudo -u hdfs hadoop jar thePackagedJar.jar DriverClass
  • -fs hdfs://cdh-manager.localdomain (file:/// means local file system with respect to the current directory or absolute path)
  • -jt cdh-manager.localdomain:8032 (local means using local runner; this url pointing to Yarn’s resource manager)
  • /user/flume/tweets/2014/12/22/{13,14,15}” (input arg[0] taking a list of directories as set. For full list of file path pattern supported by hadoop, search “hadoop fs path pattern”)
  • /out-seq (output directory with respect to hdfs:/// root)

CDH may not be able to run your application for above mahout, because mahout may contains several application in a pipeline within the mahout scripts. Cases may happen when job is stuck at the point of requiring resources. My intake is that browsing the configuration of Yarn, and set all the memory and vcore related configuration to the default (without worrying about the physical limitations).

  • mahout seqdumper -i <input file name, address depending on MAHOUT_HOME local or hdfs>
  • -o <a local file>
  • sudo -u hdfs mahout vectordump
  • -i /tweet-lda
  • -o /tmp/topic_terms
  • -p true (Print out the key as well, delimited by tab (or the value if useKey is true)
  • -d /seqdir-sparse-lda/dictionary.file-* (the dictionary used for interpreting term)
  • -dt sequencefile (The dictionary file type (text|seqfile) is sequencefile)
  • -sort /tweet-lda (Sort output key/value pairs of the vector entries in abs magnitude of ?? unknown)
  • -vs 10 (each topic to display the top 10 terms associated with this topic)
  • -ni 1000000 (max number of items, in this case the max number of topics)

Mahout driver classes use a lot of shorthand. Check what they mean by going to file driver.classes.default.props located under conf/  

1. sudo -u hdfs mahout seq2sparse -i /demo-out-seq -o /demo-seq2sparse -ow --maxDFPercent 85 --namedVector -wt TF 2. sudo -u hdfs mahout rowid -i /demo-seq2sparse/tf-vectors -o /demo-rowid-matrix 3. sudo -u hdfs mahout cvb -i /demo-rowid-matrix/matrix -o /demo-tweet-cvb -k 10 -ow -x 5 -dict /demo-seq2sparse/dictionary.file-0 -dt /demo-cvb-topics -mt /demo-cvb-model 4. sudo -u hdfs mahout vectordump -i /demo-tweet-cvb -o /tmp/demo-topic_terms -p true -d /demo-seq2sparse/dictionary.file-* -dt sequencefile -sort /demo-tweet-cvb -vs 10 -ni 1000000

When writing a MapReduce Application, following these steps.

  1. Unit test your Mapper and Reducer class. First create unit test on the individual class methods. Second use mrunit to write drive test on mapper and reducer. Driver’s configuration can be set to test various situations.
  2. Write your application driver test. Of course you gonna need your driver class first by extending Configured and implementing Tool. In your test, invoke driver’s run method to actually run your applications. It is somehow like the integration test, since it is going to hit some file system (either local or hdfs). The purpose here is to test out a smaller real data set to see if your application is working.
  3. The last testing is really the cluster deployment testing. You can involve a relatively larger dataset and actually submit your driver application to the cluster.

Following above steps allow you to have confidence when debugging. Messing up the sequence will give you no idea where the possible error is. Passing external files (runtime) to Map Reduce. It is common that map reduce application may depend on some external configurations submitted during job submission time. There are several approaches to achieve it.

  • Map Reduce configuration is a channel for job to communicate with mapper and reducer tasks. You can specify a key in mr tasks, and the task at setup time simply do a checking on the keys. Let’s say if “external.filename” has a filename pointing to it, the task knows it should check out some file. But how to let the file accessible? We can use console’s -files options which would add any local files to the task’s classpath. At runtime the file name can be specified through console option -D property=value.

sudo -u hdfs hadoop jar target/your.jar DriverClass -fs hdfs://cdh-manager.localdomain -jt cdh-manager.localdomain:8032 -files target/classes/ /user/flume/tweets/2015/01/09/11 /out-seq

  • Another method is related to doing some work on the application driver class, by reading the argument list. If a relevant argument is detected, call MapReduce Job API to “addFileToClassPath” or  “addCacheFile”. But it may require a hdfs path visible (i.e. the file has been uploaded to hdfs or a URI is available for hadoop to fetch the file). I am still investigating and the useful link would be latter part of this tutorial.

Quick Run Spark with Docker in Mac OS

For the time being, I am just listing a brief guide.

1. install boot2docker, which is a linux core required to run docker.

2. install spark docker by (just pull docker image). The docker image has a hadoop yarn inside as well.

3. in MacOs, start boot2docker first, note down and export the $DOCKER_HOST environment. In MacOS, any docker client command requires this environment to connect to boot2docker’s host.

4. after export the docker environment in MacOS, investigate docker containers by `docker ps` with -a or -l(astest), or `docker inspect CONTAINER_HASH

5. now in Mac OS, it is safe to run

docker run  -p 4040:4040 -p 8030:8030 -p 49707:49707 -p 50020:50020 -p 8042:8042 -p 50070:50070 -p 8033:8033 -p 8032:8032 -p 50075:50075 -p 22:22 -p 8031:8031 -p 8040:8040 -p 50010:50010 -p 50090:50090 -p 8088:8088 -i -t -h sand    box sequenceiq/spark:1.2.0 /etc/ -bash

I open the ports so that MacOS can access directly.

6. you can exam the ports opened in boot2docker by `sudo iptables -t nat -L -n` from boot2docker. Enter boot2docker by `boot2docker ssh`, and make hostname `sandbox` to be reflected in Mac OS by adding it in /etc/hosts

Despite the learning curve in docker, I have to say so far this way is the most convenient for deploying/testing a spark application and less computational power consuming.

Downside: It is not a cluster, but one node only.

Learning Notes: Software Architecture – Taking A Deeper Dive

Trade Off and ATAM:

Fulfilling all stakeholder’s requirements are impossible

Agile ATAM: Design conceptual architecture -> Bring stakeholder to define scenarios -> prioritize stakeholder’s requirements -> help in negotiating the priority -> redesign -> ask relevant stakeholder representatives including programmers to negotiate/evalute -> evolve the design and involve stakeholders early and often (dealing with scenarios/techniques changes)

Further reading: Software Architecture in Practice 3rd Edition. SEI digital library.

Continuous Delivery:

Water Fall approach is not getting quick feedback, but do continuous delivery to get fast feedback by releasing new features constantly.

holistic engineering: friction, e.g. database schema from source code.??

Automation in your development, testing and deployment; use automated tools or developing your own tools, e.g. use selenium, cucumber-puppet.

When automating, do consider the time constrain by using a timer, and avoid shaving the yak. Good enough for now is better than perfect in future.

Differentiate aspects among static, dynamic and production, developing working software as soon as possible, building vertical slides over time,

hide new feature until it finishes: feature toggle over feature branch (sync as often as necessary to avoid conflicts- merge ambush), togglez platform, google chunk-based development

Each change should be small and releasable: users don’t like giant changes, but gradually changes like in Facebook; Canary Releasing using router to choose users to move to newer version;  dark releasing using timezone to gradually release like Facebook chat; strangler pattern: new changes obsoletes a small percent of old features and live together by allowing users to change to new feature by themselves, like cloudera’s switching to new configuration UI.

Walk Through of Installing Cloudera Manager on a Single Node

I am following the official guide of auto installation, while it seems easy at first glance. I had a rough time on it.

Before going on, CDH is considered kind of heavy distribution of hadoop. What I am doing is to use CDH as dev, and so far my experiences are (1) VMWorkstation on a 16G RAM windows host machine, with CDH taking up to 8G single node or (2) Mac 8G RAM host, with vagrant-enabled virtual box taking up to 4G and 2 vcores for a single node (more than one node will cause non-functional CDH)

Make sure if you are behind a proxy by enabling proxy as stated in the guide. In addition, once you are in the phase of downloading parcels, remember to configure parcel downloading proxy setting through web browser. Confirm on the downloading issue of parcel by checking ‘/var/log/cloudera-scm-server/cloudera-scm-server.log‘.

Don’t need to deal with java and just let it go with default oracle-j2sdk.

Disable IPv6 by following the blog guide.

Find ip address of the machine node and create FQDN as suggested in the comments. And Cloudera requires more and more:

The hosts in a Cloudera Manager deployment must satisfy the following networking and security requirements:

  • Cluster hosts must have a working network name resolution system and correctly formatted /etc/hosts file. All cluster hosts must have properly configured forward and reverse host resolution through DNS. The /etc/hosts files must
    • Contain consistent information about hostnames and IP addresses across all hosts
    • Not contain uppercase hostnames
    • Not contain duplicate IP addresses

    A properly formatted /etc/hosts file should be similar to the following example:	localhost.localdomain	localhost	cluster-01	cluster-02	cluster-03

My intake is that 1) avoid using from loopback interface, using IPs assigned through eth0 interface. 2) make hostname is FQDN as well, by using `sudo hostname <FQDN>` and saving the name in `/etc/hostname` for reboot.

Allow host to be resolvable from local /etc/hosts files by this:

cat /etc/NetworkManager/dnsmasq.d/hosts.conf 

Disable firewall and iptable as in the guide.

CDH requires root access using password or private key. My take is using password for root user is easier. Do following,

Make sure openssh-server is installed and started

sudo apt-get install openssh-server

Give root a password, enable ssh root access

sudo passwd root;

vi /etc/ssh/sshd_config;

PermitRootLogin yes

service ssh restart

Test ssh access as root

ssh localhost

With above configuration, install cloudera manager and installation cluster should be working.

During cluster installation, if you need to retry from web browser, you may need to manually remove the lock by:

sudo rm /tmp/.scm_prepare_node.lock

If encountering any problem, you can always uninstall and get back to a clean state by following the uninstallation guide.

Note: during cluster installation, if the web browser does not show any progress bar, that means something wrong. Check the root access listed above.

======= after a running CDH, configurations to be continue ======
CDH calculates the settings (like memory location) for the host, but sometimes the configuration is not checked against the minimum requirement of installed components.

For example, the test installation with estimating PI does not work, unless increasing following memory settings in Yarn as (wired enough, the log in yarn does not point anything useful),

– Set the Container Memory (yarn.nodemanager.resource.memory-mb) to 4GB
– Set the Java Heap Size of ResourceManager to 1GB

The most useful way is to check the non-default settings by switching to the new view.

======= running mahout example ======
When trying to execute `mahout seq2sparse -i reuters-out-seqdir/ -o reuters-out-seqdir-lda -ow –maxDFPercent 85 –namedVector` with `MAHOUT_LOCAL` set to “something not null”, meaning running on local, guava library version mismatch.

Exception in thread “main” java.lang.NoSuchMethodError:

Mahout requires guava 16.0, while hadoop V2 uses guava 11.0.

The solution is quite wired. I was simply going to turn on the log by reading CDH 5’s mahout script, which is pointing mahout conf directory to /etc/mahout/conf.dist. In the conf directory, I put a simple log4j properties under it. Surprisingly, the guava problem is gone.

Learning Notes of Scala

Learning Scala is not easy, but it is interesting. I followed the Coursera course to understand scala. The purpose to learn scala is

1. to understand how functional could be helpful for my daily tasks.

2. to make use of spark better and to be able to write spark code.

3. to make use of Akka for concurrency programming, which is an advanced stage for java programmers like me.

The fact is,

Non-deterministic = parallel processing (concurrency and parallel) + mutable states

People want to write codes that are within their control, while controlling time (sync over time and remove unwanted scenarios) is difficult. So imperative programming is from the machine perspective, while not for the sake of programmers, and the main purpose of scala for me is to learn how to write concurrent codes easily.

Get Your Own Playframework Working

I found there is a lack on how to run customised play framework in the internet. This is what I found,


Base Play version: bv; The play version which your play project based on: pv; Sbt-version specified in your play project’s sv; your customised play version, cv;

0. Download the play binary’s and dependencies. (not required maybe)

1. Download the source of base versioned as bv;

2. Change the source code of bv as whatever you want;

3. Compile your play as specified in play docs (must be the same bv); sth. like ./build; public-local etc. your customised play will be available in .ivy2/local

4. Make sure your pv is the same as bv. Otherwise do a upgrade according to the docs. Note, if pv is different from bv, some manual work needs to be done such as change sv, different syntax in build.sbt, different play commands activator /play run, etc.

5. Make your project depends on your customised play, by adding

resolvers ++= Seq(
Resolver.file(“Local Repository”, file(“/YOUR PATH/.ivy2/local”))(Resolver.ivyStylePatterns),


//// Use the Play sbt plugin for Play projects
addSbtPlugin(“” % “sbt-plugin” % “cv”)

Note the cv is used for sbtplugin to identify the customised play build.