Outside of scalability and high performance on large data sets with Python, Ibis is focused on simplifying analytics tasks for end users. By designing a rich pandas-like domain specific language (DSL) embedded in Python code, we can hide away the complexities normally associated with expressing analytical concepts in SQL or some other tool. This post gives some specific examples and shows how we're solving them in Ibis.
We've all been there; you have some higher cardinality category and you wish to restrict your analysis to a fixed subset of values according to a ranking that you devise. If you're using pandas, you might do something like:
K = 5 avg_value = data.groupby('category').value.mean() top_categories = avg_value.order(ascending=False)[:K].index filtered_data = data[data.category.isin(top_categories)]
Let's look at a concrete example using both Ibis and SQL. I'm using the now-classic airlines dataset in Parquet format.
import ibis ibis.options.interactive = True hdfs = ibis.hdfs_connect('bottou01.sjc.cloudera.com') con = ibis.impala.connect('bottou01.sjc.cloudera.com', hdfs_client=hdfs) airlines = con.parquet_file('/user/wesm/airlines-parquet', persist=True, name='airlines_parquet', database='wes') # airlines.compute_stats()
Let's look at the top 5 origin airports by total number of flights:
query = """ SELECT origin, count(*) as `nrows` FROM wes.airlines_parquet GROUP BY 1 ORDER BY nrows DESC LIMIT 5 """ expr = con.sql(query) expr
origin nrows 0 ORD 6597442 1 ATL 6100953 2 DFW 5710980 3 LAX 4089012 4 PHX 3491077
Constructed from SQL primitives, the TopK operation consists of 3 tasks
Here is the Impala query plan for this (use
con.explain(expr) with Ibis to see it)
05:MERGING-EXCHANGE [UNPARTITIONED] | order by: count(*) DESC | limit: 5 | 02:TOP-N [LIMIT=5] | order by: count(*) DESC | 04:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: origin | 03:EXCHANGE [HASH(origin)] | 01:AGGREGATE | output: count(*) | group by: origin | 00:SCAN HDFS [wes.airlines_parquet] partitions=1/1 files=8 size=1.34GB
As you can see, Impala, like most SQL engines, has the built-in notion of
TOP-N in its query engine, but yet this is not expressible directly in SQL. Things get more complicated when you want to filter down to the categories in the result:
SELECT dest, avg(arrdelay) as `nrows` FROM wes.airlines_parquet WHERE origin in ( SELECT origin FROM ( SELECT origin, count(*) as `nrows` FROM wes.airlines_parquet GROUP BY 1 ORDER BY nrows DESC LIMIT 5 ) t0 ) GROUP BY 1
If you look at the query plan for this, it's more complicated but functionally contains the same TopK pattern plus a filter (using a
LEFT SEMI JOIN, a type of efficient filter join).
Ibis is happy to let you build your own TopK expression using primitive operations:
top_origins = (airlines .group_by('origin') .aggregate(airlines.count().name('nrows')) .sort_by(ibis.desc('nrows')) .limit(5)) top_origins.execute()
This same aggregated table can be used as a filter, too:
# Replace nulls with 0 avg_delay = airlines.arrdelay.mean().fillna(0).name('avg_delay') subset_airlines = airlines[airlines.origin.isin(top_origins.origin)] avg_delay_subset = (subset_airlines .group_by('dest') .aggregate(avg_delay) .sort_by(ibis.desc('avg_delay')) .limit(10)) avg_delay_subset.execute()
This is quite a bit of data gymnastics, though, and you haven't really modeled the fundamental Top-N task with the API. Because we have the power to create higher-level abstractions and expand them later into concrete operations (backend-dependent), we can craft a
top_origins = airlines.origin.topk(5)
The result of
topk in Ibis is what's known as an analytic expression. Its behavior depends on the context in which it's used. It is executable as is, yielding the top 5 origins by count:
But you can also use it directly as a table filter:
expr = (airlines[top_origins] .group_by('dest') .aggregate(avg_delay) .sort_by(ibis.desc('avg_delay')) .limit(10)) expr.execute()
What's going on here is that the
top_origins expression is a sort of logical data type that hasn't yet been adapted to any physical data operations. There's a number of ways it can be used, and that adaptation process happens automatically:
You might notice that I've sneakily done two TopK operations in a row, the first ranking by count and the second by average arrival delay.
topk luckily gives you a way to provide a ranking metric other than count:
We can combine all this together to do a double-TopN:
def avg_delay(x): return x.arrdelay.mean().fillna(0).name('avg_delay') expr = airlines[top_origins].dest.topk(10, by=avg_delay) expr.execute()
Another example: bucketing
Why stop there? After seeing lots of use cases in the wild where code reuse and composability would make analysts a great deal more productive, we'll be working to build more and more high level analytics tools into Ibis.
As another example which I present as mostly code-only, consider the task of creating a discrete binning of arrival delays and computing a number of metrics. I'll let the code speak for itself:
count nulls min max sum mean approx_nunique 0 120947440 2587529 -1437 2598 852674931 7.049963 3801
delay_buckets = [1, 30, 60, 120] bin = (airlines.arrdelay .fillna(0) .bucket(delay_buckets, include_over=True, include_under=True).name('delay_bin')) delay_summary = airlines.group_by(bin).arrdelay.summary() delay_summary
delay_bin count nulls min max sum mean \ 0 3 4039142 0 60 120 333493735 82.565489 1 4 1594279 0 121 2598 290961107 182.503255 2 1 44759224 0 1 29 451748992 10.092869 3 0 63108269 2587529 -1437 0 -531724696 -8.425595 4 2 7446526 0 30 59 308195793 41.387862 approx_nunique 0 62 1 2490 2 29 3 471 4 30
bin_name = (delay_summary.delay_bin .label(['On time', 'Less then 30 min', '30 min to 1 hour', '1 to 2 hours', 'More than 2 hours'])) expr = delay_summary.mutate(bin_name=bin_name) expr.sort_by('delay_bin')
delay_bin count nulls min max sum mean \ 0 0 63108269 2587529 -1437 0 -531724696 -8.425595 1 1 44759224 0 1 29 451748992 10.092869 2 2 7446526 0 30 59 308195793 41.387862 3 3 4039142 0 60 120 333493735 82.565489 4 4 1594279 0 121 2598 290961107 182.503255 approx_nunique bin_name 0 471 On time 1 29 Less then 30 min 2 30 30 min to 1 hour 3 62 1 to 2 hours 4 2490 More than 2 hours
In future posts I'll go into some more detail on category types in Ibis (like the result of
bucket) and some of the other tools used here.