Skip to main content

Spark SQL Connector

Overview#

Using Osm4scala Spark SQL Connector, reading OSM Pbf file from PySpark, Spark Scala, SparkSQL or SparkR is so easy as writing .read.format("osm.pbf").

The current implementation offers:

  • Spark 2 and 3 versions, with Scala 2.11 and 2.12
  • Full Spark SQL integration.
  • Easy schema.
  • Internal optimizations, like:
    • Transparent parallelism reading multiply pbf files.
    • File splitting to increase parallelism per pbf file.
    • Pushdown required columns.

The library is distributed via Maven Repo in two different flavours:

Options#

This is the list of options available when creating a dataframe:

Optiondefaultpossible valuesdescription
splittruetrue/falseIf false, Spark will not split pbf files, so parallelization will be per file.

Ex. from the Spark Shell:

Scala
scala> val osmDF = spark.read.format("osm.pbf").option("split", "false").load("<osm files path here>")
PySpark
>>> osmDF = spark.read.format("osm.pbf").option("split", "false").load("<osm files path here>")
SQL
spark-sql> CREATE TABLE osm USING osm.pbf OPTIONS ( 'split' = 'false' ) LOCATION '<osm files path here>';

Schema definition#

The Dataframe Schema used is the following one:

root
|-- id: long (nullable = true)
|-- type: byte (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- nodes: array (nullable = true)
| |-- element: long (containsNull = true)
|-- relations: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- relationType: byte (nullable = true)
| | |-- role: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- info: struct (nullable = true)
| |-- version: integer (nullable = true)
| |-- timestamp: timestamp (nullable = true)
| |-- changeset: long (nullable = true)
| |-- userId: integer (nullable = true)
| |-- userName: string (nullable = true)
| |-- visible: boolean (nullable = true)

Where the column type could be:

valuemeaning
0Node
1Way
2Relation

Where the column relationType could be:

valuemeaning
0Node
1Way
2Relation
3Unrecognized

All in one jar#

Usually, osm4scala is used from the Spark Shell or from a Notebook. For these cases, to simplify the way to add the connector as dependency, you have a shaded fat jar version with all dependencies that are necessary. The fat jar is near 5MB, so the size should be not a problem.

As you probably know, Spark is base in Scala. Different Spark distributions are using different Scala versions. This is the Spark/Scala version combination available for latest release v1.0.11:

Spark BranchScalaPackages
2.42.11com.acervera.osm4scala:osm4scala-spark2-shaded_2.11:1.0.11
2.42.12com.acervera.osm4scala:osm4scala-spark2-shaded_2.12:1.0.11
3.0 / 3.12.12com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11

Although following sections are focus on Spark Shell and Notebooks, you can use the same technique in other situations where you want to use the shaded version.

Why a fat shaded library?#

Osm4scala has a transitive dependency with Java Google Protobuf library. Spark, Hadoop and other libraries in the ecosystem are using an old version of the same library (currently v2.5.0 from Mar, 2013) that is not compatible.

To solve the conflict, I published the library in two fashion:

  • Fat and Shaded as osm4scala-spark[2,3]-shaded that solves com.google.protobuf.** conflicts.
  • Don't shaded as osm4scala-spark[2,3], so you can solve the conflict on your way.

Spark Shell#

  1. Start the spark shell as usual, using the --packages option to add the right dependency. The dependency will depend to the Spark Version that you are using. Please, check the reference table in the previous section.

    Scala
    bin/spark-shell --packages 'com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11'
    PySpark
    bin/pyspark --packages 'com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11'
    SQL
    bin/spark-sql --packages 'com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11'
  2. Create the Dataframe using the osm.pbf format, pointing to the pbf file or folder containing pbf files.

    Scala
    scala> val osmDF = spark.read.format("osm.pbf").load("<osm files path here>")
    PySpark
    >>> osmDF = spark.read.format("osm.pbf").load("<osm files path here>")
    SQL
    spark-sql> CREATE TABLE osm USING osm.pbf LOCATION '<osm files path here>';
  3. Use the created dataframe as usual, keeping in mind the schema explained previously.

    • In the next example, we are going to count the number of different primitives in the file. As explained in the schema, 0 are nodes, 1 ways and 2 relations.

      Scala
      scala> osmDF.groupBy("type").count().show()
      +----+--------+
      |type| count|
      +----+--------+
      | 1| 2096455|
      | 2| 91971|
      | 0|19426617|
      +----+--------+
      PySpark
      >>> osmDF.groupBy("type").count().show()
      +----+--------+
      |type| count|
      +----+--------+
      | 1| 2096455|
      | 2| 91971|
      | 0|19426617|
      +----+--------+
      SQL
      spark-sql> select type, count(type) from osm group by type
      1 338795
      2 10357
      0 2328075
    • In this other examples, we are going to extract all traffic lights as POIs.

      Scala
      scala> osmDF.select("latitude", "longitude", "tags").where("element_at(tags, 'highway') == 'traffic_signals'").show(10,false)
      +------------------+-------------------+------------------------------------------------------------------------------+
      |latitude |longitude |tags |
      +------------------+-------------------+------------------------------------------------------------------------------+
      |54.59766649999997 |-5.8889806000000045|[highway -> traffic_signals] |
      |54.58006689999997 |-5.938683200000003 |[highway -> traffic_signals, traffic_signals -> signal] |
      |54.58260049999997 |-5.946187600000005 |[direction -> backward, highway -> traffic_signals, traffic_signals -> signal]|
      |51.90097769999996 |-8.470285700000005 |[highway -> traffic_signals] |
      |51.901616299999965|-8.470139700000004 |[highway -> traffic_signals] |
      |51.89978239999997 |-8.465829200000002 |[highway -> traffic_signals] |
      |51.89707529999997 |-8.474892800000001 |[highway -> traffic_signals] |
      |51.89784849999997 |-8.466895200000002 |[highway -> traffic_signals] |
      |51.89547809999997 |-8.476100900000002 |[highway -> traffic_signals] |
      |51.89772569999997 |-8.477145100000003 |[highway -> traffic_signals] |
      +------------------+-------------------+------------------------------------------------------------------------------+
      only showing top 10 rows
      PySpark
      >>> osmDF.select("latitude", "longitude", "tags").where("element_at(tags, 'highway') == 'traffic_signals'").show(10,False)
      +------------------+-------------------+------------------------------------------------------------------------------+
      |latitude |longitude |tags |
      +------------------+-------------------+------------------------------------------------------------------------------+
      |54.59766649999997 |-5.8889806000000045|[highway -> traffic_signals] |
      |54.58006689999997 |-5.938683200000003 |[highway -> traffic_signals, traffic_signals -> signal] |
      |54.58260049999997 |-5.946187600000005 |[direction -> backward, highway -> traffic_signals, traffic_signals -> signal]|
      |51.90097769999996 |-8.470285700000005 |[highway -> traffic_signals] |
      |51.901616299999965|-8.470139700000004 |[highway -> traffic_signals] |
      |51.89978239999997 |-8.465829200000002 |[highway -> traffic_signals] |
      |51.89707529999997 |-8.474892800000001 |[highway -> traffic_signals] |
      |51.89784849999997 |-8.466895200000002 |[highway -> traffic_signals] |
      |51.89547809999997 |-8.476100900000002 |[highway -> traffic_signals] |
      |51.89772569999997 |-8.477145100000003 |[highway -> traffic_signals] |
      +------------------+-------------------+------------------------------------------------------------------------------+
      only showing top 10 rows
      SQL
      spark-sql> select latitude, longitude, tags from osm where type = 0 and element_at(tags, "highway") == 'traffic_signals' limit 10;
      40.42125 -3.6844500000000004 {"crossing":"traffic_signals","crossing_ref":"zebra","highway":"traffic_signals"}
      40.41779000000001 -3.6241199999999996 {"highway":"traffic_signals"}
      40.41473000000003 -3.627109999999999 {"highway":"traffic_signals"}
      40.414200000000015 -3.6282099999999993 {"highway":"traffic_signals"}
      40.42635999999994 -3.727220000000005 {"crossing":"traffic_signals","highway":"traffic_signals"}
      40.41937999999995 -3.688820000000004 {"highway":"traffic_signals"}
      40.426489999999944 -3.687640000000004 {"highway":"traffic_signals","traffic_signals":"signal"}
      40.421339999999944 -3.683020000000004 {"highway":"traffic_signals"}
      40.41797999999994 -3.669340000000004 {"highway":"traffic_signals"}
      40.418319999999945 -3.6762200000000043 {"highway":"traffic_signals","traffic_signals":"signal"}
      Time taken: 0.128 seconds, Fetched 10 row(s)

Notebook#

There are different notebooks solutions in the market and each one is using a different way to import libraries. But after importing the library, you can use the osm4scala connector in the same way.

For this section, we are going to use Jupyter Notebook and JupyterLab.

note

If you can not access to a Jupyter Notebook installation, you can use jupyter/all-spark-notebook Docker image as I will do. Here, full documentation about how to install and use it.

To start the docker image, as easy as running the docker image and use the link provided:

$ docker run -e JUPYTER_ENABLE_LAB=yes -d -p 8888:8888 -p 4040:4040 -p 4041:4041 jupyter/all-spark-notebook
[I 11:02:45.132 NotebookApp] Serving notebooks from local directory: /home/jovyan
[I 11:02:45.132 NotebookApp] Jupyter Notebook 6.2.0 is running at:
[I 11:02:45.132 NotebookApp] http://479a92a85698:8888/?token=60ace2db5d456f7348c2ba0399cab986e36f4de9XX00a554
[I 11:02:45.132 NotebookApp] or http://127.0.0.1:8888/?token=60ace2db5d456f7348c2ba0399cab986e36f4de9XX00a554
[I 11:02:45.132 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 11:02:45.135 NotebookApp]
To access the notebook, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/nbserver-6-open.html
Or copy and paste one of these URLs:
http://479a92a85698:8888/?token=60ace2db5d456f7348c2ba0399cab986e36f4de9XX00a554
or http://127.0.0.1:8888/?token=60ace2db5d456f7348c2ba0399cab986e36f4de9XX00a554

If you prefer an online option, you can try MyBinder.

  1. Create a new Notebook. For Scala, we are going to use the spylon-kernel.

  2. Add a new Cell and import the right osm4scala library for your Notebook installation, following the table at the start of the All in one jar section. In or case, the version used is Spark v3.1.1 with Scala 2.12.

    %%init_spark
    launcher.packages = ["com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11"]

    If you did not execute anything before, running the cell will start the Spark session. Sometime, depending to the Notebook used, you will need to restart the Spark session (or Kernel session).

  3. From the previous step, you can start creating dataframes from pbf files as we did before in previous sections.

    Let's suppose that you uploaded a file called monaco-anonymized.osm.pbf into the notebook's work folder.

    If you create a new Cell with next content, you will get all traffic signals in Monaco.

    Scala
    val osmDF = spark.read.format("osm.pbf").load("/home/jovyan/work/monaco-anonymized.osm.pbf")
    osmDF.select("latitude", "longitude")
    .where("element_at(tags, 'highway') == 'traffic_signals'")
    .show
    %%python
    osm_df = spark.read.format("osm.pbf").load("/home/jovyan/work/monaco-anonymized.osm.pbf")
    osm_df.select("latitude", "longitude").where("element_at(tags, 'highway') == 'traffic_signals'").show()
    note

    Pay attention to the first line in the Python Cell. Because the kernel used is using Scala as default, you need to add the %%python header.

    Next, a screenshot with the output generated: Notebook Screenshoot

    Of course, from the dataframe you can create beautiful maps, graphs, etc. But that is out of the scope of this documentation.

Spark application#

When we need to write more complex analysis, data extractions, ETLs, etc, it is necessary to write Spark applications.

  1. Import the spark connector it is not really necessary because the integration is transparent.

    Only two possible advantages (not available if using Python) are:

    • The use of static constants, for example, to avoid magic numbers for primitive and relation types.

    • Using the library as part of Unit Testing or Integration Testing.

    • Adding osm4scala jar library as part of the deployable artifact.

      For Python, like in Scala, it is not necessary to import the library except in runtime. But unlike in Scala, you can not easily to import and use facilities from the Scala library. So in this case, you can jump to the next step.

      Sbt
      libraryDependencies += "com.acervera.osm4scala" % "osm4scala-spark3-shaded_2.12" % "1.0.11"
      Maven
      <dependency>
      <groupId>com.acervera.osm4scala</groupId>
      <artifactId>osm4scala-spark3-shaded_2.12</artifactId>
      <version>1.0.11</version>
      </dependency>
      Reduce artifact size.

      The shaded dependency is near 5MB. You can add this dependency as a package when you submit the job instead to include it in the deployable artifact generated. To do it, set the scope dependency as Test or Provided.

      If you don't know what I'm talking about, don't pay too much attention and forget it. 😉

  2. Create the Dataframe using the osm.pbf format, pointing to the pbf file or folder containing pbf files, and use as usual.

    Scala / PrimitivesCounter.scala
    import com.acervera.osm4scala.spark.OsmSqlEntity
    import org.apache.spark.sql.SparkSession
    object PrimitivesCounter {
    def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .appName("Primitives counter")
    .getOrCreate()
    spark.read
    .format("osm.pbf")
    .load(args(0))
    .groupBy(OsmSqlEntity.FIELD_TYPE)
    .count
    .show
    }
    }
    PySpark / PrimiriveCounter.py
    from pyspark.sql import SparkSession
    import sys
    if __name__ == '__main__':
    spark = SparkSession.builder.appName("Primitives counter").getOrCreate()
    spark.read.format("osm.pbf")\
    .load(sys.argv[1])\
    .groupBy("type")\
    .count()\
    .show()
  3. Submit the application to your Spark cluster.

    Scala
    bin/spark-submit \
    --packages 'com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11' \
    examples/spark-documentation/target/scala-2.12/osm4scala-examples-spark-documentation_2.12-1.0.11.jar \
    /tmp/osm/monaco-anonymized.osm.pbf
    PySpark
    bin/spark-submit \
    --packages 'com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11' \
    examples/spark-documentation/src/main/scala/com/acervera/osm4scala/examples/spark/documentation/PrimiriveCounter.py \
    /tmp/osm/monaco-anonymized.osm.pbf
    Optional --packages.

    You will not need to add --packages 'com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.11' if it is part of the deployed artifact.

More Examples#

Following, more examples. This time, we will create a SQL temporal view and SQL:

scala> val osmDF = spark.sqlContext.read.format("osm.pbf").load("<osm files path here>")
osmDF: org.apache.spark.sql.DataFrame = [id: bigint, type: tinyint ... 5 more fields]
scala> osmDF.createOrReplaceTempView("osm")
Primitives counter
scala> spark.sql("select type, count(*) as num_primitives from osm group by type").show()
+----+--------------+
|type|num_primitives|
+----+--------------+
| 1| 338795|
| 2| 10357|
| 0| 2328075|
+----+--------------+
Extract all keys used in tags
scala> spark.sql("select distinct(explode(map_keys(tags))) as tag_key from osm order by tag_key asc").show()
+------------------+
| tag_key|
+------------------+
| Calle|
| Conference|
| Exper|
| FIXME|
| ISO3166-1|
| ISO3166-1:alpha2|
| ISO3166-1:alpha3|
| ISO3166-1:numeric|
| ISO3166-2|
| MAC_dec|
| Nombre|
| Numero|
| Open|
| Peluqueria|
| Residencia UEM|
| Telefono|
| abandoned|
| abandoned:amenity|
| abandoned:barrier|
|abandoned:building|
+------------------+
only showing top 20 rows
Extract id, coords and tags from all nodes
scala> spark.sql("select id, latitude, longitude, tags from osm where type = 0").show()
+--------+------------------+-------------------+--------------------+
| id| latitude| longitude| tags|
+--------+------------------+-------------------+--------------------+
| 171933| 40.42006|-3.7016600000000004| []|
| 171946| 40.42125|-3.6844500000000004|[highway -> traff...|
| 171948|40.420230000000004|-3.6877900000000006| []|
| 171951|40.417350000000006|-3.6889800000000004| []|
| 171952| 40.41499|-3.6889800000000004| []|
| 171953| 40.41277|-3.6889000000000003| []|
| 171954| 40.40946|-3.6887900000000005| []|
| 171959| 40.40326|-3.7012200000000006| []|
|20952874| 40.42099|-3.6019200000000007| []|
|20952875|40.422610000000006|-3.5994900000000007| []|
|20952878| 40.42136000000001| -3.601470000000001| []|
|20952879| 40.42262000000001| -3.599770000000001| []|
|20952881| 40.42905000000001|-3.5970500000000007| []|
|20952883| 40.43131000000001|-3.5961000000000007| []|
|20952888| 40.42930000000001| -3.596590000000001| []|
|20952890| 40.43012000000001|-3.5961500000000006| []|
|20952891| 40.43043000000001|-3.5963600000000007| []|
|20952892| 40.43057000000001|-3.5969100000000007| []|
|20952893| 40.43039000000001|-3.5973200000000007| []|
|20952895| 40.42967000000001|-3.5972300000000006| []|
+--------+------------------+-------------------+--------------------+
only showing top 20 rows
Extract id, nodes and tags from all ways
scala> spark.sql("select id, nodes, tags from osm where type = 1").show()
+-------+--------------------+--------------------+
| id| nodes| tags|
+-------+--------------------+--------------------+
|3996189|[23002322, 230022...|[name -> M-40, in...|
|3996190|[20952892, 213645...|[name -> Avenida ...|
|3996191|[21364526, 253693...|[lanes -> 2, onew...|
|3996192|[20952914, 242495...|[name -> Plaza de...|
|3996195|[20952923, 421448...|[name -> Calle de...|
|3996196|[20952942, 209529...|[name -> Avenida ...|
|3996197|[20952893, 209628...|[name -> Avenida ...|
|3996199|[20952929, 209529...|[name -> Calle de...|
|3996203|[20952948, 391553...|[name -> Calle de...|
|3997425|[20960686, 219912...|[name -> Avenida ...|
|3997426|[2424952617, 2095...|[name -> Avenida ...|
|3997427|[20960717, 209606...|[name -> Calle de...|
|3997428|[20960693, 209607...|[highway -> terti...|
|3997429|[20960696, 421448...|[name -> Calle de...|
|3997430|[20963025, 209630...|[name -> Paseo de...|
|3997432|[20960688, 209607...|[name -> Calle de...|
|3997433|[1811010970, 1811...|[name -> Calle de...|
|4004278|[255148257, 21067...|[name -> Calle de...|
|4004280|[20963101, 209630...|[name -> Calle de...|
|4004281|[25530614, 297977...|[name -> Calle de...|
+-------+--------------------+--------------------+
only showing top 20 rows
Extract id, relations and tags from all relations
scala> spark.sql("select id, relations, tags from osm where type = 2").show()
+-----+--------------------+--------------------+
| id| relations| tags|
+-----+--------------------+--------------------+
|11331|[[2609596233, 0, ...|[network -> Cerca...|
|11332|[[196618381, 1, p...|[network -> Cerca...|
|14612|[[24698019, 1, ou...|[website -> http:...|
|30117|[[26629303, 1, ou...|[type -> multipol...|
|30399|[[307006515, 1, i...|[website -> http:...|
|38757|[[6120746, 1, ], ...|[network -> lcn, ...|
|38965|[[44571128, 1, fr...|[type -> restrict...|
|48292|[[317775809, 0, s...|[network -> Metro...|
|49958|[[308868559, 0, v...|[type -> restrict...|
|49959|[[308868558, 0, v...|[type -> restrict...|
|50874|[[26141446, 1, ou...|[name -> Escuela ...|
|52312|[[24531942, 1, ou...|[name -> Pista pr...|
|52313|[[24698560, 1, ou...|[type -> multipol...|
|53157|[[2609596077, 0, ...|[network -> Cerca...|
|55085|[[246285922, 0, s...|[network -> Cerca...|
|55087|[[194005015, 1, ]...|[network -> Cerca...|
|55799|[[28775036, 1, ou...|[type -> multipol...|
|56044|[[258556530, 0, s...|[network -> Metro...|
|56260|[[144383571, 1, o...|[name -> Ayuntami...|
|56791|[[32218973, 0, st...|[network -> Metro...|
+-----+--------------------+--------------------+
only showing top 20 rows
Extract id, relations and tags from all relations
scala> spark.sql("select id, type, info.version, info.userId, info.userName, date_format(info.timestamp, \"dd-MMM-y kk:mm:ss z\") as timestamp from osm where info.userId IS NOT NULL").show(5, false)
+---------+----+-------+--------+------------+------------------------+
|id |type|version|userId |userName |timestamp |
+---------+----+-------+--------+------------+------------------------+
|10966459 |2 |26 |18XXXX |XXXXX |01-Oct-2020 18:44:49 IST|
|166399497|1 |4 |16XXXXX |XXXXXXXXXXXX|09-Aug-2019 05:52:21 IST|
|434583789|1 |7 |25XXXX |XXXXXX |05-Mar-2020 06:48:01 IST|
|161752645|1 |14 |11XXXXXX|XXXXXXXXXXX |22-Mar-2021 09:08:10 IST|
|690021772|1 |1 |31XXXX |XXXXX |14-May-2019 19:18:06 IST|
+---------+----+-------+--------+------------+------------------------+
only showing top 5 rows

Plain (non-shaded jar) dependency.#

Sometimes we need to write more complex applications, analysis, data extractions, ETLs, integrate with other libraries, unit testing, etc. In that case, the best practice is to manage dependencies using sbt or maven, instead to import the shaded file.

OSM Pbf files are based on Protocol Buffer, so Scalapb is used as deserializer so it's the unique transitive dependency.

This is the Spark/Scala version combination available for latest release v1.0.11:

Spark branchScalapbScalaPackages
2.40.9.72.11com.acervera.osm4scala:osm4scala-spark2_2.11:1.0.11
2.40.10.22.12com.acervera.osm4scala:osm4scala-spark2_2.12:1.0.11
3.0 / 3.10.10.22.12com.acervera.osm4scala:osm4scala-spark3_2.12:1.0.11

After importing the connector, you can use it as we explained in the All in one section. So lets see how to import the library in our project and few examples.

Resolving dependency conflicts#

Osm4scala has a transitive dependency with Java Google Protobuf library. Spark, Hadoop and other libraries in the ecosystem are using an older version of the same library (currently v2.5.0 from Mar, 2013) that is not compatible.

To be able to resolve this conflicts, you will need to shade your deployed jar. The conflict comes from the package com.google.protobuf.

Following, how to do it using SBT:

Sbt
assemblyShadeRules in assembly := Seq(
ShadeRule
.rename("com.google.protobuf.**" -> "shadeproto.@1")
.inAll
)

It is possible to do the same using the shade maven plugin.