diff --git a/README.md b/README.md index bc968b8..403c76b 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,13 @@ by Abishek Ramdas and Ghislain Fourny -This is the Python edition of [RumbleDB](https://rumbledb.org/), which brings [JSONiq](https://www.jsoniq.org) to the world of Spark and DataFrames. JSONiq is a language considerably more powerful than SQL as it can process [messy, heterogeneous datasets](https://arxiv.org/abs/1910.11582), from kilobytes to Petabytes, with very little coding effort. +This is the Python edition of [RumbleDB](https://rumbledb.org/), which brings [JSONiq](https://www.jsoniq.org) to the world of Python. -The Python edition of RumbleDB is currently only a prototype (alpha) and probably unstable. We welcome bug reports. +JSONiq is a language considerably more powerful than SQL as it can process [messy, heterogeneous datasets](https://arxiv.org/abs/1910.11582), from kilobytes to Petabytes, with very little coding effort. + +Spark aficionados can also pass DataFrames to JSONiq queries and take back DataFrames. This gives them an environment in which both Spark SQL and JSONiq co-exist to manipulate the data. + +The Python edition of RumbleDB is currently a prototype (alpha) and probably unstable. We welcome bug reports and feedback. ## About RumbleDB @@ -24,20 +28,33 @@ A RumbleSession is a wrapper around a SparkSession that additionally makes sure JSONiq queries are invoked with rumble.jsoniq() in a way similar to the way Spark SQL queries are invoked with spark.sql(). -Any number of Python DataFrames can be attached to external JSONiq variables used in the query. It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://rumble.readthedocs.io/en/latest/). +JSONiq variables can be bound to lists of JSON values (str, int, float, True, False, None, dict, list) or to Pyspark DataFrames. A JSONiq query can use as many variables as needed (for example, it can join between different collections). -The resulting sequence of items can be retrieved as DataFrame, as an RDD, as a Python list, or with a streaming iteration over the items. +It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://rumble.readthedocs.io/en/latest/). -The individual items can be processed using the RumbleDB [Item API](https://github.com/RumbleDB/rumble/blob/master/src/main/java/org/rumbledb/api/Item.java). +The resulting sequence of items can be retrieved as a list of JSON values, as a Pyspark DataFrame, or, for advanced users, as an RDD or with a streaming iteration over the items using the [RumbleDB Item API](https://github.com/RumbleDB/rumble/blob/master/src/main/java/org/rumbledb/api/Item.java). -Alternatively, it is possible to directly get a Python list of JSON values, or a streaming iteration of JSON values. This is a convenience that makes it unnecessary to use the Item API, especially for a first-time user. +It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Pyspark. -It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Spark. - -The design goal is that it should be possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc. +The design goal is that it is possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc. Any feedback or error reports are very welcome. +## Type mapping + +When passing Python values to JSONiq or getting them from a JSONiq queries, the mapping is as follows: + +| Python | JSONiq | +|-------|-------| +|dict|object| +|list|array| +|str|string| +|int|integer| +|bool|boolean| +|None|null| + +Furthermore, other JSONiq types will be mapped to string literals. Users who want to preserve JSONiq types can use the Item API instead. + ## Installation Install with @@ -49,7 +66,7 @@ pip install jsoniq ## Sample code -We will make more documentation available as we go. In the meantime, you will find a sample code below that should just run +We will make more documentation available as we go. In the meantime, you will find a sample, commented code below that should just run after installing the library. You can directly copy paste the code below to a Python file and execute it with Python. @@ -60,15 +77,145 @@ from jsoniq import RumbleSession # The syntax to start a session is similar to that of Spark. # A RumbleSession is a SparkSession that additionally knows about RumbleDB. # All attributes and methods of SparkSession are also available on RumbleSession. -rumble = RumbleSession.builder.appName("PyRumbleExample").getOrCreate(); + +rumble = RumbleSession.builder.getOrCreate(); + +# Just to improve readability when invoking Spark methods +# (such as spark.sql() or spark.createDataFrame()). +spark = rumble + +############################## +###### Your first query ###### +############################## + +# Even though RumbleDB uses Spark internally, it can be used without any knowledge of Spark. + +# Executing a query is done with rumble.jsoniq() like so. A query returns a sequence +# of items, here the sequence with just the integer item 2. +items = rumble.jsoniq('1+1') + +# A sequence of items can simply be converted to a list of Python/JSON values with json(). +# Since there is only one value in the sequence output by this query, +# we get a singleton list with the integer 2. +# Generally though, the results may contain zero, one, two, or more items. +python_list = items.json() +print(python_list) + +############################################ +##### More complex, standalone queries ##### +############################################ + +# JSONiq is very powerful and expressive. You will find tutorials as well as a reference on JSONiq.org. + +seq = rumble.jsoniq(""" + +let $stores := +[ + { "store number" : 1, "state" : "MA" }, + { "store number" : 2, "state" : "MA" }, + { "store number" : 3, "state" : "CA" }, + { "store number" : 4, "state" : "CA" } +] +let $sales := [ + { "product" : "broiler", "store number" : 1, "quantity" : 20 }, + { "product" : "toaster", "store number" : 2, "quantity" : 100 }, + { "product" : "toaster", "store number" : 2, "quantity" : 50 }, + { "product" : "toaster", "store number" : 3, "quantity" : 50 }, + { "product" : "blender", "store number" : 3, "quantity" : 100 }, + { "product" : "blender", "store number" : 3, "quantity" : 150 }, + { "product" : "socks", "store number" : 1, "quantity" : 500 }, + { "product" : "socks", "store number" : 2, "quantity" : 10 }, + { "product" : "shirt", "store number" : 3, "quantity" : 10 } +] +let $join := + for $store in $stores[], $sale in $sales[] + where $store."store number" = $sale."store number" + return { + "nb" : $store."store number", + "state" : $store.state, + "sold" : $sale.product + } +return [$join] +"""); + +print(seq.json()); + +seq = rumble.jsoniq(""" +for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10) +group by $store-number := $product.store-number +order by $store-number ascending +return { + "store" : $store-number, + "products" : [ distinct-values($product.product) ] +} +"""); +print(seq.json()); + +############################################################ +###### Binding JSONiq variables to Python values ########### +############################################################ + +# It is possible to bind a JSONiq variable to a list of native Python values +# and then use it in a query. +# JSONiq, variables are bound to sequences of items, just like the results of JSONiq +# queries are sequence of items. +# A Python list will be seamlessly converted to a sequence of items by the library. +# Currently we only support strs, ints, floats, booleans, None, lists, and dicts. +# But if you need more (like date, bytes, etc) we will add them without any problem. +# JSONiq has a rich type system. + +rumble.bind('$c', [1,2,3,4, 5, 6]) +print(rumble.jsoniq(""" +for $v in $c +let $parity := $v mod 2 +group by $parity +return { switch($parity) + case 0 return "even" + case 1 return "odd" + default return "?" : $v +} +""").json()) + +rumble.bind('$c', [[1,2,3],[4,5,6]]) +print(rumble.jsoniq(""" +for $i in $c +return [ + for $j in $i + return { "foo" : $j } +] +""").json()) + +rumble.bind('$c', [{"foo":[1,2,3]},{"foo":[4,{"bar":[1,False, None]},6]}]) +print(rumble.jsoniq('{ "results" : $c.foo[[2]] }').json()) + +# It is possible to bind only one value. The it must be provided as a singleton list. +# This is because in JSONiq, an item is the same a sequence of one item. +rumble.bind('$c', [42]) +print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json()) + +# For convenience and code readability, you can also use bindOne(). +rumble.bindOne('$c', 42) +print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json()) + + +################################################ +##### Using Pyspark DataFrames with JSONiq ##### +################################################ + +# The power users can also interface our library with pyspark DataFrames. +# JSONiq sequences of items can have billions of items, and our library supports this +# out of the box: it can also run on clusters on AWS Elastic MapReduce for example. +# But your laptop is just fine, too: it will spread the computations on your cores. +# You can bind a DataFrame to a JSONiq variable. JSONiq will recognize this +# DataFrame as a sequence of object items. # Create a data frame also similar to Spark (but using the rumble object). data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]; columns = ["Name", "Age"]; -df = rumble.createDataFrame(data, columns); +df = spark.createDataFrame(data, columns); # This is how to bind a JSONiq variable to a dataframe. You can bind as many variables as you want. -rumble.bindDataFrameAsVariable('$a', df); +rumble.bind('$a', df); # This is how to run a query. This is similar to spark.sql(). # Since variable $a was bound to a DataFrame, it is automatically declared as an external variable @@ -101,18 +248,18 @@ df.show(); # A DataFrame output by JSONiq can be reused as input to a Spark SQL query. # (Remember that rumble is a wrapper around a SparkSession object, so you can use rumble.sql() just like spark.sql()) -df.createTempView("input") -df2 = rumble.sql("SELECT * FROM input").toDF("name"); +df.createTempView("myview") +df2 = spark.sql("SELECT * FROM myview").toDF("name"); df2.show(); # A DataFrame output by Spark SQL can be reused as input to a JSONiq query. -rumble.bindDataFrameAsVariable('$b', df2); +rumble.bind('$b', df2); seq2 = rumble.jsoniq("for $i in 1 to 5 return $b"); df3 = seq2.df(); df3.show(); # And a DataFrame output by JSONiq can be reused as input to another JSONiq query. -rumble.bindDataFrameAsVariable('$b', df3); +rumble.bind('$b', df3); seq3 = rumble.jsoniq("$b[position() lt 3]"); df4 = seq3.df(); df4.show(); @@ -170,54 +317,6 @@ seq.write().mode("overwrite").parquet("outputparquet"); seq = rumble.jsoniq("1+1"); seq.write().mode("overwrite").text("outputtext"); -############################################ -##### More complex, standalone queries ##### -############################################ - -seq = rumble.jsoniq(""" - -let $stores := -[ - { "store number" : 1, "state" : "MA" }, - { "store number" : 2, "state" : "MA" }, - { "store number" : 3, "state" : "CA" }, - { "store number" : 4, "state" : "CA" } -] -let $sales := [ - { "product" : "broiler", "store number" : 1, "quantity" : 20 }, - { "product" : "toaster", "store number" : 2, "quantity" : 100 }, - { "product" : "toaster", "store number" : 2, "quantity" : 50 }, - { "product" : "toaster", "store number" : 3, "quantity" : 50 }, - { "product" : "blender", "store number" : 3, "quantity" : 100 }, - { "product" : "blender", "store number" : 3, "quantity" : 150 }, - { "product" : "socks", "store number" : 1, "quantity" : 500 }, - { "product" : "socks", "store number" : 2, "quantity" : 10 }, - { "product" : "shirt", "store number" : 3, "quantity" : 10 } -] -let $join := - for $store in $stores[], $sale in $sales[] - where $store."store number" = $sale."store number" - return { - "nb" : $store."store number", - "state" : $store.state, - "sold" : $sale.product - } -return [$join] -"""); - -print(seq.json()); - -seq = rumble.jsoniq(""" -for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10) -group by $store-number := $product.store-number -order by $store-number ascending -return { - "store" : $store-number, - "products" : [ distinct-values($product.product) ] -} -"""); -print(seq.json()); - ``` # How to learn JSONiq, and more query examples @@ -225,6 +324,12 @@ Even more queries can be found [here](https://colab.research.google.com/github/R # Last updates +## Version 0.1.0 alpha 12 +- Allow to bind JSONiq variables to Python values (mapping Python lists to sequences of items). This makes it possible to manipulate Python values directly with JSONiq and even without any knowledge of Spark at all. +- renamed bindDataFrameAsVariable() to bind(), which can be used both with DataFrames and Python lists. +- add bindOne() for binding a single value to a JSONiq variable. +- wrapping df() in a Pyspark DataFrame to make sure it can be used with pyspark DataFrame transformations. + ## Version 0.1.0 alpha 11 - Fix an issue when feeding a DataFrame output by rumble.jsoniq() back to a new JSONiq query (as a variable). diff --git a/pyproject.toml b/pyproject.toml index 97b10b6..b0dc675 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "jsoniq" -version = "0.1.0a11" +version = "0.1.0a12" description = "Python edition of RumbleDB, a JSONiq engine" requires-python = ">=3.11" dependencies = [ @@ -21,6 +21,8 @@ classifiers = [ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", ] [tool.setuptools.packages.find] diff --git a/src/jsoniq/jars/rumbledb-1.24.0.jar b/src/jsoniq/jars/rumbledb-1.24.0.jar index 8bbe750..e7f1ada 100644 Binary files a/src/jsoniq/jars/rumbledb-1.24.0.jar and b/src/jsoniq/jars/rumbledb-1.24.0.jar differ diff --git a/src/jsoniq/sequence.py b/src/jsoniq/sequence.py index b2ca8f8..f7f2b5d 100644 --- a/src/jsoniq/sequence.py +++ b/src/jsoniq/sequence.py @@ -1,11 +1,13 @@ from pyspark import RDD from pyspark.sql import SparkSession +from pyspark.sql import DataFrame import json class SequenceOfItems: - def __init__(self, sequence, sparkcontext): + def __init__(self, sequence, sparksession): self._jsequence = sequence - self._sparkcontext = sparkcontext + self._sparkcontext = sparksession.sparkContext + self._sparksession = sparksession def json(self): return [json.loads(l.serializeAsJSON()) for l in self._jsequence.items()] @@ -15,6 +17,9 @@ def rdd(self): rdd = RDD(rdd, self._sparkcontext) return rdd.map(lambda l: json.loads(l)) + def df(self): + return DataFrame(self._jsequence.getAsDataFrame(), self._sparksession) + def nextJSON(self): return self._jsequence.next().serializeAsJSON() diff --git a/src/jsoniq/session.py b/src/jsoniq/session.py index e5cc968..66ddd95 100644 --- a/src/jsoniq/session.py +++ b/src/jsoniq/session.py @@ -83,6 +83,50 @@ def __getattr__(self, name): _builder = Builder() + def convert(self, value): + if isinstance(value, bool): + return self._sparksession._jvm.org.rumbledb.items.ItemFactory.getInstance().createBooleanItem(value) + elif isinstance(value, str): + return self._sparksession._jvm.org.rumbledb.items.ItemFactory.getInstance().createStringItem(value) + elif isinstance(value, int): + return self._sparksession._jvm.org.rumbledb.items.ItemFactory.getInstance().createLongItem(value) + elif isinstance(value, float): + return self._sparksession._jvm.org.rumbledb.items.ItemFactory.getInstance().createDoubleItem(value) + elif value is None: + return self._sparksession._jvm.org.rumbledb.items.ItemFactory.getInstance().createNullItem() + elif isinstance(value, list): + java_list = self._sparksession._jvm.java.util.ArrayList() + for v in value: + java_list.add(self.convert(v)) + return self._sparksession._jvm.org.rumbledb.items.ItemFactory.getInstance().createArrayItem(java_list, False) + elif isinstance(value, dict): + java_map = self._sparksession._jvm.java.util.HashMap() + for k, v in value.items(): + java_list = self._sparksession._jvm.java.util.ArrayList() + java_list.add(self.convert(v)) + java_map[k] = java_list + return self._sparksession._jvm.org.rumbledb.items.ItemFactory.getInstance().createObjectItem(java_map, False) + else: + raise ValueError("Cannot yet convert value of type " + str(type(value)) + " to a RumbleDB item. Please open an issue and we will look into it!") + + def bind(self, name: str, valueToBind): + conf = self._jrumblesession.getConfiguration(); + if not name.startswith("$"): + raise ValueError("Variable name must start with a dollar symbol ('$').") + name = name[1:] + if isinstance(valueToBind, list): + items = [ self.convert(value) for value in valueToBind] + conf.setExternalVariableValue(name, items) + return self + if(hasattr(valueToBind, "_get_object_id")): + conf.setExternalVariableValue(name, valueToBind); + else: + conf.setExternalVariableValue(name, valueToBind._jdf); + return self; + + def bindOne(self, name: str, value): + return self.bind(name, [value]) + def bindDataFrameAsVariable(self, name: str, df): conf = self._jrumblesession.getConfiguration(); if not name.startswith("$"): @@ -96,7 +140,7 @@ def bindDataFrameAsVariable(self, name: str, df): def jsoniq(self, str): sequence = self._jrumblesession.runQuery(str); - return SequenceOfItems(sequence, self._sparksession.sparkContext); + return SequenceOfItems(sequence, self._sparksession); def __getattr__(self, item): return getattr(self._sparksession, item) \ No newline at end of file