Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 170 additions & 65 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

by Abishek Ramdas and Ghislain Fourny

This is the Python edition of [RumbleDB](https://rumbledb.org/), which brings [JSONiq](https://www.jsoniq.org) to the world of Spark and DataFrames. JSONiq is a language considerably more powerful than SQL as it can process [messy, heterogeneous datasets](https://arxiv.org/abs/1910.11582), from kilobytes to Petabytes, with very little coding effort.
This is the Python edition of [RumbleDB](https://rumbledb.org/), which brings [JSONiq](https://www.jsoniq.org) to the world of Python.

The Python edition of RumbleDB is currently only a prototype (alpha) and probably unstable. We welcome bug reports.
JSONiq is a language considerably more powerful than SQL as it can process [messy, heterogeneous datasets](https://arxiv.org/abs/1910.11582), from kilobytes to Petabytes, with very little coding effort.

Spark aficionados can also pass DataFrames to JSONiq queries and take back DataFrames. This gives them an environment in which both Spark SQL and JSONiq co-exist to manipulate the data.

The Python edition of RumbleDB is currently a prototype (alpha) and probably unstable. We welcome bug reports and feedback.

## About RumbleDB

Expand All @@ -24,20 +28,33 @@ A RumbleSession is a wrapper around a SparkSession that additionally makes sure

JSONiq queries are invoked with rumble.jsoniq() in a way similar to the way Spark SQL queries are invoked with spark.sql().

Any number of Python DataFrames can be attached to external JSONiq variables used in the query. It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://rumble.readthedocs.io/en/latest/).
JSONiq variables can be bound to lists of JSON values (str, int, float, True, False, None, dict, list) or to Pyspark DataFrames. A JSONiq query can use as many variables as needed (for example, it can join between different collections).

The resulting sequence of items can be retrieved as DataFrame, as an RDD, as a Python list, or with a streaming iteration over the items.
It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://rumble.readthedocs.io/en/latest/).

The individual items can be processed using the RumbleDB [Item API](https://github.com/RumbleDB/rumble/blob/master/src/main/java/org/rumbledb/api/Item.java).
The resulting sequence of items can be retrieved as a list of JSON values, as a Pyspark DataFrame, or, for advanced users, as an RDD or with a streaming iteration over the items using the [RumbleDB Item API](https://github.com/RumbleDB/rumble/blob/master/src/main/java/org/rumbledb/api/Item.java).

Alternatively, it is possible to directly get a Python list of JSON values, or a streaming iteration of JSON values. This is a convenience that makes it unnecessary to use the Item API, especially for a first-time user.
It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Pyspark.

It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Spark.

The design goal is that it should be possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc.
The design goal is that it is possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc.

Any feedback or error reports are very welcome.

## Type mapping

When passing Python values to JSONiq or getting them from a JSONiq queries, the mapping is as follows:

| Python | JSONiq |
|-------|-------|
|dict|object|
|list|array|
|str|string|
|int|integer|
|bool|boolean|
|None|null|

Furthermore, other JSONiq types will be mapped to string literals. Users who want to preserve JSONiq types can use the Item API instead.

## Installation

Install with
Expand All @@ -49,7 +66,7 @@ pip install jsoniq

## Sample code

We will make more documentation available as we go. In the meantime, you will find a sample code below that should just run
We will make more documentation available as we go. In the meantime, you will find a sample, commented code below that should just run
after installing the library.

You can directly copy paste the code below to a Python file and execute it with Python.
Expand All @@ -60,15 +77,145 @@ from jsoniq import RumbleSession
# The syntax to start a session is similar to that of Spark.
# A RumbleSession is a SparkSession that additionally knows about RumbleDB.
# All attributes and methods of SparkSession are also available on RumbleSession.
rumble = RumbleSession.builder.appName("PyRumbleExample").getOrCreate();

rumble = RumbleSession.builder.getOrCreate();

# Just to improve readability when invoking Spark methods
# (such as spark.sql() or spark.createDataFrame()).
spark = rumble

##############################
###### Your first query ######
##############################

# Even though RumbleDB uses Spark internally, it can be used without any knowledge of Spark.

# Executing a query is done with rumble.jsoniq() like so. A query returns a sequence
# of items, here the sequence with just the integer item 2.
items = rumble.jsoniq('1+1')

# A sequence of items can simply be converted to a list of Python/JSON values with json().
# Since there is only one value in the sequence output by this query,
# we get a singleton list with the integer 2.
# Generally though, the results may contain zero, one, two, or more items.
python_list = items.json()
print(python_list)

############################################
##### More complex, standalone queries #####
############################################

# JSONiq is very powerful and expressive. You will find tutorials as well as a reference on JSONiq.org.

seq = rumble.jsoniq("""

let $stores :=
[
{ "store number" : 1, "state" : "MA" },
{ "store number" : 2, "state" : "MA" },
{ "store number" : 3, "state" : "CA" },
{ "store number" : 4, "state" : "CA" }
]
let $sales := [
{ "product" : "broiler", "store number" : 1, "quantity" : 20 },
{ "product" : "toaster", "store number" : 2, "quantity" : 100 },
{ "product" : "toaster", "store number" : 2, "quantity" : 50 },
{ "product" : "toaster", "store number" : 3, "quantity" : 50 },
{ "product" : "blender", "store number" : 3, "quantity" : 100 },
{ "product" : "blender", "store number" : 3, "quantity" : 150 },
{ "product" : "socks", "store number" : 1, "quantity" : 500 },
{ "product" : "socks", "store number" : 2, "quantity" : 10 },
{ "product" : "shirt", "store number" : 3, "quantity" : 10 }
]
let $join :=
for $store in $stores[], $sale in $sales[]
where $store."store number" = $sale."store number"
return {
"nb" : $store."store number",
"state" : $store.state,
"sold" : $sale.product
}
return [$join]
""");

print(seq.json());

seq = rumble.jsoniq("""
for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10)
group by $store-number := $product.store-number
order by $store-number ascending
return {
"store" : $store-number,
"products" : [ distinct-values($product.product) ]
}
""");
print(seq.json());

############################################################
###### Binding JSONiq variables to Python values ###########
############################################################

# It is possible to bind a JSONiq variable to a list of native Python values
# and then use it in a query.
# JSONiq, variables are bound to sequences of items, just like the results of JSONiq
# queries are sequence of items.
# A Python list will be seamlessly converted to a sequence of items by the library.
# Currently we only support strs, ints, floats, booleans, None, lists, and dicts.
# But if you need more (like date, bytes, etc) we will add them without any problem.
# JSONiq has a rich type system.

rumble.bind('$c', [1,2,3,4, 5, 6])
print(rumble.jsoniq("""
for $v in $c
let $parity := $v mod 2
group by $parity
return { switch($parity)
case 0 return "even"
case 1 return "odd"
default return "?" : $v
}
""").json())

rumble.bind('$c', [[1,2,3],[4,5,6]])
print(rumble.jsoniq("""
for $i in $c
return [
for $j in $i
return { "foo" : $j }
]
""").json())

rumble.bind('$c', [{"foo":[1,2,3]},{"foo":[4,{"bar":[1,False, None]},6]}])
print(rumble.jsoniq('{ "results" : $c.foo[[2]] }').json())

# It is possible to bind only one value. The it must be provided as a singleton list.
# This is because in JSONiq, an item is the same a sequence of one item.
rumble.bind('$c', [42])
print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json())

# For convenience and code readability, you can also use bindOne().
rumble.bindOne('$c', 42)
print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json())


################################################
##### Using Pyspark DataFrames with JSONiq #####
################################################

# The power users can also interface our library with pyspark DataFrames.
# JSONiq sequences of items can have billions of items, and our library supports this
# out of the box: it can also run on clusters on AWS Elastic MapReduce for example.
# But your laptop is just fine, too: it will spread the computations on your cores.
# You can bind a DataFrame to a JSONiq variable. JSONiq will recognize this
# DataFrame as a sequence of object items.

# Create a data frame also similar to Spark (but using the rumble object).
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)];
columns = ["Name", "Age"];
df = rumble.createDataFrame(data, columns);
df = spark.createDataFrame(data, columns);

# This is how to bind a JSONiq variable to a dataframe. You can bind as many variables as you want.
rumble.bindDataFrameAsVariable('$a', df);
rumble.bind('$a', df);

# This is how to run a query. This is similar to spark.sql().
# Since variable $a was bound to a DataFrame, it is automatically declared as an external variable
Expand Down Expand Up @@ -101,18 +248,18 @@ df.show();

# A DataFrame output by JSONiq can be reused as input to a Spark SQL query.
# (Remember that rumble is a wrapper around a SparkSession object, so you can use rumble.sql() just like spark.sql())
df.createTempView("input")
df2 = rumble.sql("SELECT * FROM input").toDF("name");
df.createTempView("myview")
df2 = spark.sql("SELECT * FROM myview").toDF("name");
df2.show();

# A DataFrame output by Spark SQL can be reused as input to a JSONiq query.
rumble.bindDataFrameAsVariable('$b', df2);
rumble.bind('$b', df2);
seq2 = rumble.jsoniq("for $i in 1 to 5 return $b");
df3 = seq2.df();
df3.show();

# And a DataFrame output by JSONiq can be reused as input to another JSONiq query.
rumble.bindDataFrameAsVariable('$b', df3);
rumble.bind('$b', df3);
seq3 = rumble.jsoniq("$b[position() lt 3]");
df4 = seq3.df();
df4.show();
Expand Down Expand Up @@ -170,61 +317,19 @@ seq.write().mode("overwrite").parquet("outputparquet");
seq = rumble.jsoniq("1+1");
seq.write().mode("overwrite").text("outputtext");

############################################
##### More complex, standalone queries #####
############################################

seq = rumble.jsoniq("""

let $stores :=
[
{ "store number" : 1, "state" : "MA" },
{ "store number" : 2, "state" : "MA" },
{ "store number" : 3, "state" : "CA" },
{ "store number" : 4, "state" : "CA" }
]
let $sales := [
{ "product" : "broiler", "store number" : 1, "quantity" : 20 },
{ "product" : "toaster", "store number" : 2, "quantity" : 100 },
{ "product" : "toaster", "store number" : 2, "quantity" : 50 },
{ "product" : "toaster", "store number" : 3, "quantity" : 50 },
{ "product" : "blender", "store number" : 3, "quantity" : 100 },
{ "product" : "blender", "store number" : 3, "quantity" : 150 },
{ "product" : "socks", "store number" : 1, "quantity" : 500 },
{ "product" : "socks", "store number" : 2, "quantity" : 10 },
{ "product" : "shirt", "store number" : 3, "quantity" : 10 }
]
let $join :=
for $store in $stores[], $sale in $sales[]
where $store."store number" = $sale."store number"
return {
"nb" : $store."store number",
"state" : $store.state,
"sold" : $sale.product
}
return [$join]
""");

print(seq.json());

seq = rumble.jsoniq("""
for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10)
group by $store-number := $product.store-number
order by $store-number ascending
return {
"store" : $store-number,
"products" : [ distinct-values($product.product) ]
}
""");
print(seq.json());

```
# How to learn JSONiq, and more query examples

Even more queries can be found [here](https://colab.research.google.com/github/RumbleDB/rumble/blob/master/RumbleSandbox.ipynb) and you can look at the [JSONiq documentation](https://www.jsoniq.org) and tutorials.

# Last updates

## Version 0.1.0 alpha 12
- Allow to bind JSONiq variables to Python values (mapping Python lists to sequences of items). This makes it possible to manipulate Python values directly with JSONiq and even without any knowledge of Spark at all.
- renamed bindDataFrameAsVariable() to bind(), which can be used both with DataFrames and Python lists.
- add bindOne() for binding a single value to a JSONiq variable.
- wrapping df() in a Pyspark DataFrame to make sure it can be used with pyspark DataFrame transformations.

## Version 0.1.0 alpha 11
- Fix an issue when feeding a DataFrame output by rumble.jsoniq() back to a new JSONiq query (as a variable).

Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "jsoniq"
version = "0.1.0a11"
version = "0.1.0a12"
description = "Python edition of RumbleDB, a JSONiq engine"
requires-python = ">=3.11"
dependencies = [
Expand All @@ -21,6 +21,8 @@ classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
]

[tool.setuptools.packages.find]
Expand Down
Binary file modified src/jsoniq/jars/rumbledb-1.24.0.jar
Binary file not shown.
9 changes: 7 additions & 2 deletions src/jsoniq/sequence.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from pyspark import RDD
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import json

class SequenceOfItems:
def __init__(self, sequence, sparkcontext):
def __init__(self, sequence, sparksession):
self._jsequence = sequence
self._sparkcontext = sparkcontext
self._sparkcontext = sparksession.sparkContext
self._sparksession = sparksession

def json(self):
return [json.loads(l.serializeAsJSON()) for l in self._jsequence.items()]
Expand All @@ -15,6 +17,9 @@ def rdd(self):
rdd = RDD(rdd, self._sparkcontext)
return rdd.map(lambda l: json.loads(l))

def df(self):
return DataFrame(self._jsequence.getAsDataFrame(), self._sparksession)

def nextJSON(self):
return self._jsequence.next().serializeAsJSON()

Expand Down
Loading