Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.

cta

Démarrer

cloud

Prêt à débuter ?

Télécharger Sandbox

Que pouvons-nous faire pour vous ?

fermerBouton Fermer
October 20, 2015
diapositive précédenteDiapositive suivante

Magellan: Geospatial Analytics on Spark

Geospatial data is pervasive—in mobile devices, sensors, logs, and wearables. This data’s spatial context is an important variable in many predictive analytics applications.

To benefit from spatial context in a predictive analytics application, we need to be able to parse geospatial datasets at scale, join them with target datasets that contain point in space information, and answer geometrical queries efficiently.

Unfortunately, if you are working with geospatial data and big data sets that need spatial context, there are limited open source tools that make it easy for you to parse and efficiently query spatial datasets at scale. This poses significant challenges for leveraging geospatial data in business intelligence and predictive analytics applications.

This is the problem that Magellan sets out to solve. Magellan is an open source library for Geospatial Analytics that uses Apache Spark as the underlying execution engine. Magellan facilitates geospatial queries and builds upon Spark to solve hard problems of dealing with geospatial data at scale.

In this blog post, we will introduce the problem of geospatial analytics and show how Magellan allows users to ingest geospatial data and run spatial queries at scale.

To do so, we will analyze the problem of using Uber data to examine the flow of uber traffic in the city of San Francisco.

Mapping the flow of Uber traffic in San Francisco with Magellan

Uber has published a dataset of GPS coordinates of all trips within San Francisco.

Our goal in this example is to join the Uber dataset with the San Francisco neighborhoods dataset) to obtain some interesting insights into the patterns of Uber trips in San Francisco.

Magellan has both Scala and Python bindings. In this blog post we use  the Scala APIs.
Magellan is a Spark Package, and can be included while launching the spark shell as follows:

            bin/spark-shell --packages harsha2010:magellan:1.0.3-s_2.10

The following imports are needed:

 

import magellan.{Point, Polygon, PolyLine}
import magellan.coord.NAD83
import org.apache.spark.sql.magellan.MagellanContext
import org.apache.spark.sql.magellan.dsl.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

First, we need to read the uber dataset. We assume the dataset has been downloaded and the path to the dataset is uber.path.
Let us create a case class to attach the schema to this Uber Dataset so we can use the DataFrame abstraction to deal with the data.

case class UberRecord(tripId: String, timestamp: String, point: Point)

Now we can read the dataset into a dataframe and cache the resulting dataframe.

val uber = sc.textFile(${uber.path}).map { line =>
val parts = line.split("t" )
val tripId = parts(0)
val timestamp = parts(1)
val point = Point(parts(3).toDouble, parts(2).toDouble)
UberRecord(tripId, timestamp, point)
}.
repartition(100).
toDF().
cache()

This dataset contains the trip id, the timestamp and the latitude and longitude of each point on the trip coalesced into a Point data structure.

A Point is the simplest geometric data structure available in Magellan. It represents a two dimensional point, with x and y coordinates. In this case, as is standard in geospatial analysis, the x coordinate refers to the longitude and the y coordinate the latitude.

Since this dataset is not interesting in itself, we need to enrich this dataset by determining which neighborhood each of these points lie in.

To do so, we will convert the neighborhood dataset into a dataframe as well, assuming the dataset has been downloaded and the path to the dataset is neighborhoods.path.

This dataset is in what is known as the ESRI Shapefile format.

This is one of the most common formats in which geospatial data is stored. Magellan has a Data Source implementation that understands how to parse ESRI Shapefiles into Shapes and Metadata.

val magellanContext = new MagellanContext(sc)
val neighborhoods = magellanContext.read.format("magellan").
load(${neighborhoods.path}).
select($"polygon", $"metadata").
cache()

There are two columns in this DataFrame: a shape representing the neighborhood which happens to be polygonal, and metadata which is a map of String keys and String values.

Magellan has a Polygon data structure to capture the spatial geometry of a Polygon. A Polygon in Magellan stands for a Polygonal object with zero or more holes.
Map columns can be exploded into their keys and values to yield the following dataframe:

neighborhoods.select(explode($"metadata").as(Seq("k", "v"))).show(5)

 

+----------+--------------------+
|         k|                   v|
+----------+--------------------+
|neighborho|Twin Peaks       ...|
|neighborho|Pacific Heights  ...|
|neighborho|Visitacion Valley...|
|neighborho|Potrero Hill     ...|
|neighborho|Crocker Amazon   ...|
+----------+--------------------+

Now we are getting somewhere: we are able to parse the San Francisco neighborhood dataset, extract its metadata as well as the polygon shapes that represent each neighborhood. The natural next step is to join this dataset with the uber dataset so that each point on the uber trip can be associated with its corresponding neighborhood.  

Here we run into an important spatial query: How do we compute whether a given point (uber location) lies within a given polygon (or neighborhood) ?

Magellan implements th as well as other spatial operators like intersects, intersection, contains, covers etc making it easy to use.
In Magellan, to join the Uber dataset with the San Francisco neighborhood dataset, you would issue the following Spark SQL query:

neighborhoods.
join(uber).
where($"point" within $"polygon").
select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v"))).
withColumnRenamed("v", "neighborhood").
drop("k").
show(5)
+------+---------+------------+
|tripId|timestamp|neighborhood|
+------+---------+------------+
+------+---------+------------+

This is interesting: According to our calculation, the GPS coordinates representing the Uber dataset do not fall in any of the San Francisco neighborhoods. How can this be?

This is a good point to pause and think about coordinate systems. We have been using GPS coordinates for the Uber dataset, but haven’t verified the coordinate system that the San Francisco neighborhood dataset has been encoded in.

It turns out that most datasets published by the US governmental agencies use what is called State Plane coordinates.

Magellan supports translating between different coordinate systems by implementing a transformer interface which takes in Points and outputs Points.

This covers all conformal transformations which is the set of all transformations that preserve angles.
In particular, to translate between WGS84, the GPS standard coordinate system used in the Uber dataset, and NAD83 Zone 403 (state plane), we can use the following in built transformer:

val transformer: Point => Point = (point: Point) => {
val from = new NAD83(Map("zone" -> 403)).from()
val p = point.transform(from)
new Point(3.28084 * p.x, 3.28084 * p.y)
}

Here we have defined a new transformer that applies the NAD83 transformation for Zone 403 (Northern California) and further scales the points to have units in feet instead of meters.
This allows us to enhance the uber dataset by adding a new column, the scaled column representing the coordinates in the NAD83 State Plane Coordinate System:

val uberTransformed = uber.
withColumn("nad83", $"point".transform(transformer)).
cache()

Now we are ready to perform the join again:

val joined = neighborhoods.
join(uberTransformed).
where($"nad83" within $"polygon").
select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v"))).
withColumnRenamed("v", "neighborhood").
drop("k").
cache()
joined.show(5)
 
+------+--------------------+--------------------+
|tripId|           timestamp|        neighborhood|
+------+--------------------+--------------------+
| 00002|2007-01-06T06:23:...|Marina           ...|
| 00006|2007-01-04T01:04:...|Marina           ...|
| 00008|2007-01-03T00:59:...|Castro/Upper Mark...|
| 00011|2007-01-06T09:08:...|Russian Hill     ...|
| 00014|2007-01-02T05:18:...|Mission          ...|
+------+--------------------+--------------------+

Ok, this looks much more reasonable!
One interesting question we are now ready to ask is: What are the top few neighborhoods where most Uber trips pass through?

joined.
groupBy($"neighborhood").
agg(countDistinct("tripId").
as("trips")).
orderBy(col("trips").desc).
show(5)
+--------------------+-----+
|        neighborhood|trips|
+--------------------+-----+
|South of Market  ...| 9891|
|Western Addition ...| 6794|
|Downtown/Civic Ce...| 6697|
|Financial Distric...| 6038|
|Mission          ...| 5620|
+--------------------+-----+

There are about 24664 trips for which we have neighborhood information, out of which close to 40% of the trips involve SOMA. Now if you are an Uber driver, you may just want to hang out around SOMA.
Breaking down this analysis by the neighborhood where trips originate reveals similar interesting insights.

+--------------------+-----+
|  start_neighborhood|count|
+--------------------+-----+
|South of Market  ...| 5697|
|Financial Distric...| 3542|
|Downtown/Civic Ce...| 3258|
|Western Addition ...| 2632|
|Mission          ...| 2332|
|Marina           ...| 1003|
|Nob Hill         ...|  960|
|Pacific Heights  ...|  853|
|Castro/Upper Mark...|  749|
|Russian Hill     ...|  512|
+--------------------+-----+

Out of 24664 trips, 5697 originate in SOMA. That is, 23% of all the Uber trips start in SOMA.
Another interesting question to ask is, what fraction of the Uber trips that originate in SOMA end up in SOMA.

+--------------------+-----+
|    end_neighborhood|count|
+--------------------+-----+
|South of Market  ...| 2259|
|Mission          ...|  911|
|Financial Distric...|  651|
|Downtown/Civic Ce...|  396|
|Western Addition ...|  380|
|Castro/Upper Mark...|  252|
|Potrero Hill     ...|  247|
|Nob Hill         ...|  101|
|Pacific Heights  ...|   56|
|Marina           ...|   51|
|Bernal Heights   ...|   48|
|North Beach      ...|   47|
|Haight Ashbury   ...|   45|
|Russian Hill     ...|   38|
|Chinatown        ...|   35|
|Noe Valley       ...|   32|
|Bayview          ...|   29|
|Treasure Island/Y...|   24|
|Golden Gate Park ...|   15|
|Inner Richmond   ...|   14|
+--------------------+-----+

That is, nearly 39% of all the trips that originate from SOMA end up in SOMA: as far as Uber is concerned, what happens in SOMA stays in SOMA!

As we see, once we add geospatial context to the Uber dataset, we end up with a fascinating array of questions we can ask about the nature of Uber trips in the city of San Francisco.

Summary

In this blog post, we have shown how to use Magellan to perform geospatial analysis on Spark.

Hopefully this short introduction has demonstrated how easy and elegant it is to incorporate geospatial context in your applications using Magellan.

In the next blog post, we will go under the hood to examine how Magellan leverages Spark SQL, Data Frames and Catalyst to provide elegant and simple user APIs while ensuring that spatial queries can execute efficiently.

Tags:

Comments

  • When running the following:
    val neighborhoods = magellanContext.read.format(“magellan”).
    load(${neighborhoods.path}).
    select($”polygon”, $”metadata”).
    cache()

    I get this error message:
    :33: error: value read is not a member of org.apache.spark.sql.magellan.MagellanContext
    val neighborhoods = magellanContext.read.format(“magellan”).

    I ll post this in a forum as well.

  • Started spark-shell like so:

    spark-shell –jars magellan-1.0.3-s_2.10.jar

    I get the error:

    scala> case class UberRecord(tripId: String, timestamp: String, point: Point)
    error: bad symbolic reference. A signature in Point.class refers to type DataType
    in package org.apache.spark.sql.types which is not available.
    It may be completely missing from the current classpath, or the version on
    the classpath might be incompatible with the version used when compiling Point.class.
    error: bad symbolic reference. A signature in Shape.class refers to type DataType
    in package org.apache.spark.sql.types which is not available.
    It may be completely missing from the current classpath, or the version on
    the classpath might be incompatible with the version used when compiling Shape.class.

    spark-shell –packages gives an error – Unknow option –package, so I used Jars. It might mostly be the version (I am using spark version 1.2.1)

  • Hi Ram, Thanks for the post.
    We are already doing this, all the traffic information goes into elastic cluster via stream processing.

    if I have a large elastic search cluster having all traffic data and if I write my data pipeline to create all geospatial transformations as elastic queries with plain spark, ( i.e. send the processing towards data) and store them back in elastic in aggregated fashion. Would like to hear your thoughts on , How do you think this approach and your approach of pulling the data towards processing compares in terms of me roundtrips ands on…..

    • Hi Ram, we are also studing geographic analyst with spark.Could you talk about some different among Magellan and GeoSpark,Spatial Spark.And I’d like to know your thinking about the future of magelan,if it will be a important model of spark just like graphx,streaming,mllib,sparksql.

  • Ram, on page 8 you state that “No good transformation libraries.” Have you ever worked with the GDAL/OGR suite? This is a very robust and well supported open source set of tools and libraries for doing coordinate transformation. Some parts are also used by ESRI. David

    • Hi David

      Thanks for pointing out GDAL. I have also been looking at Apache SIS as a coordinate transformation engine.
      Magellan is designed in such a way that you can plug in any coordinate transformation engine into it. We might end up picking a default one if the licenses agree and the reference engine supports all the transformations we want

      Ram

  • I am trying this on spark 1.5 and getting errors in the following line. Any suggestions please.

    scala> val magellanContext = new MagellanContext(sc)

    java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/DataSourceStrategy$
    at org.apache.spark.sql.magellan.MagellanContext$$anon$1.(MagellanContext.scala:35)
    at org.apache.spark.sql.magellan.MagellanContext.(MagellanContext.scala:32)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:22)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:27)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC.(:31)
    at $iwC$$iwC$$iwC$$iwC.(:33)
    at $iwC$$iwC$$iwC.(:35)
    at $iwC$$iwC.(:37)

  • Is the Magellan project compatible with DataBricks?

    I am able to get as far as the following command (on Spark 1.4)

    val neighborhoods = magellanContext.read.format(“magellan”).
    load(${neighborhoods.path}).
    select($”polygon”, $”metadata”).
    cache()

    But when I try to neighborhood.show(), I get the following error:

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 84, 10.49.249.128): java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected

    Any help is much appreciated. I was able to see the Uber points data just fine using df.take()

  • Hi Ram,
    Could be possible to perform a batch reverse geocoding using magellan(conversion of lat long in street addresses) using spatial join between lines of street data and points with some interpolation to find house numbers?
    Best Regards
    thanks in advance
    Pavel

  • I wanted to clarify, is the reasoning for the Shape to subclass DataType, so that you can represent these custom types, Point and Polygon, in a DataFrame?

  • Hi, can I ask for the following part, what it should be in Magellan version: 1.0.4-s_2.11
    Thank you. I tried many times but still does not work

    val transformer: Point => Point = (point: Point) => {
    val from = new NAD83(Map(“zone” -> 403)).from()
    val p = point.transform(from)
    new Point(3.28084 * p.x, 3.28084 * p.y)
    }

  • Hi,

    I’ve just discovered this great tool (Magellan version: 1.0.4-s_2.11) and I’m trying to launch your example. But I have this error message while creating a Polygon case class.

    case class PolygonRecord(polygon: Polygon)
    :11: error: not found: type Polygon

    I have the same error if with Point case class (other example). I tried without success the jar file as well… What I have to do ? Regards, Ian

  • I am trying this example on a spark 1.4.1 platform. When I do:
    val joined = neighborhoods.
    | join(uberTransformed).
    | where($”nad83″ within $”polygon”).
    | select($”tripId”, $”timestamp”, explode($”metadata”).as(Seq(“k”, “v”))).
    | withColumnRenamed(“v”, “neighborhood”).
    | drop(“k”).
    | cache()

    I get java.lang.ArrayIndexOutOfBoundsException: 1
    Anyone out there have this error?
    Thanks

  • Leave a Reply

    Your email address will not be published. Required fields are marked *

    If you have specific technical questions, please post them in the Forums

    You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>