Apache Spark (http://spark.apache.org/) is a popular open source data processing engine. It can be integrated with MariaDB ColumnStore utilizing the Spark SQL feature.
There are currently two possibilities to interact from Spark with ColumnStore. The first, is to use the ColumnStoreExporter which is part of the Bulk Data Adapters. ColumnStoreExporter can be used to export dataframes into existing tables in ColumnStore which is magnitudes faster than injecting Dataframes through JDBC. The second way is to use the MariaDB Java Connector and connect through JDBC. This is especially useful to read data from ColumnStore into Spark and to apply changes to ColumnStore's database structure through DDL.
Connects Spark and ColumnStore through ColumStore's bulk write API.
The following steps outline installing and configuring the MariaDB ColumnStoreExporter to be available in the Spark runtime:
For Debian 8, 9 and Ubuntu 16.04:
spark.driver.extraClassPath /usr/lib/javamcsapi.jar:/usr/lib/spark-scala-mcsapi-connector.jar spark.executor.extraClassPath /usr/lib/javamcsapi.jar:/usr/lib/spark-scala-mcsapi-connector.jar
For CentOS 7:
spark.driver.extraClassPath /usr/lib64/javamcsapi.jar:/usr/lib64/spark-scala-mcsapi-connector.jar spark.executor.extraClassPath /usr/lib64/javamcsapi.jar:/usr/lib64/spark-scala-mcsapi-connector.jar
For Python 2.7 they can be found in:
/usr/lib/python2.7/dist-packages, for Debian 8, 9 and Ubuntu 16.04, and in /usr/lib/python2.7/site-packages, for CentOS 7.
For Python 3 they can be found in:
/usr/lib/python3/dist-packages, for Debian 8, 9 and Ubuntu 16.04, and in /usr/lib/python3.4/site-packages for CentOS 7.
ColumnStoreExporter is compatible with Python 2.7, Python 3 and Scala.
It has a fairly simple notation: ColumnStoreExporter.export(database, table, dataframe), but requires that dataframe and table have the same structure.
Here is a simple demonstration exporting a dataframe containing numbers from 0 to 127 and their ASCII representation using ColumnStoreExporter into an existing table created with following DDL:
CREATE TABLE test.spark (ascii_representation CHAR(1), number INT) ENGINE=COLUMNSTORE;
Python 2.7 / 3
# necessary imports from pyspark import SparkContext from pyspark.sql import SQLContext, Row import columnStoreExporter # get the spark session sc = SparkContext("local", "MariaDB Spark ColumnStore Example") sqlContext = SQLContext(sc) # create the test dataframe asciiDF = sqlContext.createDataFrame(sc.parallelize(range(0, 128)).map(lambda i: Row(number=i, ascii_representation=chr(i)))) # export the dataframe columnStoreExporter.export("test","spark",asciiDF)
Scala
// necessary imports import org.apache.spark.sql.{SparkSession,DataFrame} import com.mariadb.columnstore.api.connector.ColumnStoreExporter // get the spark session val spark: SparkSession = SparkSession.builder.master("local").appName("MariaDB Spark ColumnStore Example").getOrCreate import spark.implicits._ val sc = spark.sparkContext // create the test dataframe val asciiDF = sc.makeRDD(0 until 128).map(i => (i.toChar.toString, i)).toDF("ascii_representation", "number") // export the dataframe ColumnStoreExporter.export("test", "spark", asciiDF)
The following documents provide SDK documentation:
Connects Spark and ColumnStore through JDBC.
The following steps outline installing and configuring the MariaDB Java Connector to be available to the spark runtime:
spark.driver.extraClassPath /usr/share/java/mariadb-java-client-1.5.7.jar spark.executor.extraClassPath /usr/share/java/mariadb-java-client-1.5.7.jar
Currently Spark does not correctly recognize mariadb specific jdbc connect strings and so the jdbc:mysql syntax must be used. The followings shows a simple pyspark script to query the results from ColumnStore UM server columnstore_1 into a spark dataframe:
from pyspark import SparkContext from pyspark.sql import DataFrameReader, SQLContext url = 'jdbc:mysql://columnstore_1:3306/test' properties = {'user': 'root', 'driver': 'org.mariadb.jdbc.Driver'} sc = SparkContext("local", "ColumnStore Simple Query Demo") sqlContext = SQLContext(sc) df = DataFrameReader(sqlContext).jdbc(url='%s' % url, table='results', properties=properties) df.show()
Spark SQL currently offers very limited push down capabilities, so to take advantage of ColumnStore's ability to perform efficient group by, then an inline table must be used, for example:
df = DataFrameReader(sqlContext).jdbc(url='%s' % url, table='( select year, sum(closed_roll_assess_land_value) sum_land_value from property_tax group by year) pt', properties=properties)
© 2019 MariaDB
Licensed under the Creative Commons Attribution 3.0 Unported License and the GNU Free Documentation License.
https://mariadb.com/kb/en/mariadb-columnstore-with-spark/