Ibis Project Blog

Python productivity framework for the Apache Hadoop ecosystem. Development updates, use cases, and internals.

Interactive Analytics on Dynamic Big Data in Python using Kudu, Impala, and Ibis

The new Apache Kudu (incubating) columnar storage engine together with Apache Impala (incubating) interactive SQL engine enable a new fully open source big data architecture for data that is arriving and changing very quickly. By integrating Kudu and Impala with Ibis, this functionality is now available to Python programmers with an easy-to-use pandas-like API.

I spent this last week expanding the Kudu Python client (a Cython wrapper for the C++ client API) and adding initial integration with Ibis. While my Kudu patch is still in code review, I will give you a preview here of how it all works.

Since Kudu, a native C++ storage engine, now builds on OS X, I'm writing this blog using the Kudu client on OS X, so now is a great time for developers on both OS X and Linux to get involved with developing Python tools for Kudu.

Using Impala on OS X is in the works, see the issue tracker for more. As soon as we can, we'll provide a combined Kudu/Impala DMG installer for installing the environment locally on Mac computers.

Read on for more about Kudu and a full stack demo.

Kudu overview

Kudu was designed for the Hadoop ecosystem in part to simplify architectures involving very fast-arriving and fast-changing data that needs to be immediately available for analytical queries. In the past, complex architectures were devised using the fast Parquet columnar format stored in HDFS in conjunction with HBase (for new data, but very slow for analytics), but there were numerous drawbacks that made a purpose-built column-oriented storage engine desirable. For example, while Parquet is extremely fast for analytics, data can only be appended to a dataset and not deleted or updated.

You can read much more about Kudu in Todd Lipcon's recent slide deck and in an website overview.

ZoomData put together a cool demo showing a real time analytics dashboard powered by Impala and Kudu.

For Python users, here are the key details:

  • Kudu has a SQL-like tabular data model; table columns are typed, and columns can be added and remove from tables. A Kudu cluster can store any number of tables.
  • Tables must have one or more primary keys. Like SQL databases, these will impact the performance of retrieving individual records.
  • Data is stored column-oriented, and individual table columns can be read (or scanned) very fast.
  • You can mutate a table by adding, deleting, or updating rows.
  • Data can be selected by indicating a number of conditions or predicates that must hold true
  • Kudu does not perform analytics: its job is to manage tabular data and serve it to compute engines as fast as possible.

Kudu is not coupled to any particular compute engine. Any system that can use its C++ or Java clients can use it. There are a few compute system integrations built or in progress:

Basic Kudu use in Python

Let's take our first steps in Python. I've booted up the Kudu Quickstart VM (for VirtualBox) that you can download from the Kudu website. I've installed the Kudu Python client and now import it and connect to the Kudu master in the VM:

In [1]:
import kudu
client = kudu.connect('quickstart.cloudera', 7051)

Since this is a brand new cluster, there are no tables created yet:

In [2]:
client.list_tables()
Out[2]:
[]

To create one, we first create a schema and then create the table:

In [3]:
builder = kudu.schema_builder()
builder.add_column('id', kudu.int64, nullable=False)
builder.add_column('item', kudu.string)
builder.add_column('price', kudu.double)

builder.set_primary_keys(['id'])

schema = builder.build()
schema
Out[3]:
kudu.Schema {  
  id     int64(nullable=False) PRIMARY KEY
  item   string(nullable=True)
  price  double(nullable=True)
}
In [4]:
if client.table_exists('purchases'):
    client.delete_table('purchases')

client.create_table('purchases', schema)
client.list_tables()
Out[4]:
['purchases']

Now, we can get a handle for this new table and see its schema:

In [5]:
purchases = client.table('purchases')
purchases
Out[5]:
<kudu.client.Table at 0x1037fc8f0>
In [6]:
purchases.schema
Out[6]:
kudu.Schema {  
  id     int64(nullable=False) PRIMARY KEY
  item   string(nullable=True)
  price  double(nullable=True)
}

Now, let's insert some data:

In [7]:
data = [
    (1, 'spam', 2.49),
    (2, 'eggs', 1.25),
    (3, 'coffee', 2.35),
    (4, 'spam', 2.00),
    (5, 'eggs', 2.49),
    (6, 'coffee', 2.75),
    (7, 'eggs', 2.75),
    (8, 'coffee', 1.95),
    (9, 'spam', 3.00),
    (10, 'eggs', 2.25),
    (11, 'eggs', 2.00),
    (12, 'coffee', 2.35)
]

To add, change, or remove data from a table, you must create a session to group the operations:

In [8]:
session = client.new_session()

Now, you create insert operations and add them to the session and call its flush method:

In [9]:
for _id, _item, _price in data:
    op = purchases.new_insert()
    op['id'] = _id
    op['item'] = _item
    op['price'] = _price
    session.apply(op)
    
session.flush()

Now, suppose we wanted to select some data from the table. To do this, we create a scanner for the table in question:

In [10]:
scanner = purchases.scanner()

To read all of the data out, you open the scanner and call one of its read methods:

In [11]:
scanner.open()
scanner.read_all_tuples()
Out[11]:
[(1, 'spam', 2.49),
 (2, 'eggs', 1.25),
 (3, 'coffee', 2.35),
 (4, 'spam', 2.0),
 (5, 'eggs', 2.49),
 (6, 'coffee', 2.75),
 (7, 'eggs', 2.75),
 (8, 'coffee', 1.95),
 (9, 'spam', 3.0),
 (10, 'eggs', 2.25),
 (11, 'eggs', 2.0),
 (12, 'coffee', 2.35)]

To only read a particular subset of data, you add predicates to the scanner:

In [12]:
scanner = purchases.scanner()
scanner.add_predicate(purchases['item'] == 'spam')
scanner.open()
scanner.read_all_tuples()
Out[12]:
[(1, 'spam', 2.49), (4, 'spam', 2.0), (9, 'spam', 3.0)]
In [13]:
scanner = purchases.scanner()
id_col = purchases['id']
scanner.add_predicates([id_col >= 5, id_col <= 10])
scanner.open()
scanner.read_all_tuples()
Out[13]:
[(5, 'eggs', 2.49),
 (6, 'coffee', 2.75),
 (7, 'eggs', 2.75),
 (8, 'coffee', 1.95),
 (9, 'spam', 3.0),
 (10, 'eggs', 2.25)]

That's all we need to know for now. There's lots more to know about Kudu and things that can be added to the Python interface, such as:

  • Native pandas DataFrame read/write capability
  • Hash and range partition configuration

Querying existing Kudu tables with Ibis and Impala

In the latest development version of Ibis, you can add Kudu to the mix when working with Impala. Let's take a look:

In [14]:
import ibis
ibis.options.interactive = True

host = 'quickstart.cloudera'

hdfs = ibis.hdfs_connect(host, port=50070)
ic = ibis.impala.connect(host, port=21050, hdfs_client=hdfs)

This Impala cluster is built with Kudu support, so I can connect my Ibis client to the Kudu master like so:

In [15]:
ic.kudu.connect(host, 7051)

Now, let's see about that data we just wrote:

In [16]:
ic.kudu.list_tables()
Out[16]:
['purchases']

The table method on ic.kudu automatically creates an Impala table whose metadata references the existing data in Kudu:

In [17]:
purchases = ic.kudu.table('purchases')
In [18]:
purchases
Out[18]:
    id    item  price
0    1    spam   2.49
1    2    eggs   1.25
2    3  coffee   2.35
3    4    spam   2.00
4    5    eggs   2.49
5    6  coffee   2.75
6    7    eggs   2.75
7    8  coffee   1.95
8    9    spam   3.00
9   10    eggs   2.25
10  11    eggs   2.00
11  12  coffee   2.35

The result behaves just like any other Ibis table, such as those you might have used with HDFS or SQLite:

In [19]:
purchases[purchases.item == 'spam']
Out[19]:
   id  item  price
0   1  spam   2.49
1   4  spam   2.00
2   9  spam   3.00
In [20]:
purchases.group_by('item').price.mean()
Out[20]:
     item  mean(price)
0    spam     2.496667
1  coffee     2.350000
2    eggs     2.148000

You can issue SELECT, INSERT, DELETE, and UPDATE queries on data in Kudu tables via Impala, but for now only SELECT and INSERT operations are available from Ibis.

Creating new Kudu-backed Impala tables

The Impala client's Kudu interface has a method create_table which enables more flexible Impala table creation with data stored in Kudu. This includes:

  • Creating empty tables with a particular schema
  • Creating tables from an Ibis table expression (i.e. a "CTAS" in database speak)
  • Creating tables from pandas DataFrame objects

Let's look at a couple examples.

First, I'll create an empty Kudu-backed table:

In [21]:
schema = ibis.schema([('foo', 'int32'),
                      ('bar', 'string'),
                      ('baz', 'double')])

if ic.exists_database('kudu_tables'):
    ic.drop_database('kudu_tables', force=True)

ic.create_database('kudu_tables')
impala_name = 'example1'
kudu_name = 'example1-kudu-table'
ic.kudu.create_table(impala_name, kudu_name, schema=schema,
                     primary_keys=['foo'],
                     database='kudu_tables')

db = ic.database('kudu_tables')
In [22]:
impala_table = db.table(impala_name)
impala_table.schema()
Out[22]:
ibis.Schema {  
  foo  int32
  bar  string
  baz  double
}

Let's look in Kudu!

In [23]:
client.list_tables()
Out[23]:
['purchases', 'example1-kudu-table']
In [24]:
example1 = client.table(kudu_name)
In [25]:
example1.schema
Out[25]:
kudu.Schema {  
  foo  int32(nullable=False) PRIMARY KEY
  bar  string(nullable=True)
  baz  double(nullable=True)
}

Note that Impala has neither the notion of primary keys nor non-nullable fields, but this metadata can inform query planning.

Now, I'll create a table by inserting a subset of the purchases table:

In [26]:
expr = purchases[purchases.item == 'spam']

ic.kudu.create_table('example2', 'example2-kudu', obj=expr,
                     primary_keys=['id'],
                     database='kudu_tables')
In [27]:
db.example2
Out[27]:
   id  item  price
0   1  spam   2.49
1   4  spam   2.00
2   9  spam   3.00

We can look directly in Kudu to see the data that was inserted.

In [28]:
example2_data = client.table('example2-kudu')
scanner = example2_data.scanner()
scanner.open()
scanner.read_all_tuples()
Out[28]:
[(1, 'spam', 2.49), (4, 'spam', 2.0), (9, 'spam', 3.0)]

In the context of the "Great Decoupling"

Kudu's "bring your own SQL" design is well in line with the broader decoupling and commoditization of open source storage and compute systems that has been going on the last 10 years. It is the responsibility of productivity-centric programming interfaces like Ibis (which you can think of as a "UI for developers") to enhance interoperability and hide as much complexity from the user as possible.

Conclusions

Kudu is an exciting new open source storage technology which, when combined with a high performance compute engine like Impala, enables scalable high performance analytics on fast-changing data sets. Having this functionality seamlessly available to Python programmers using Ibis will make it much easier to develop end-to-end applications involving big data analytics. Most importantly, the code that you write will be largely the same whether you have 1000 or 100 billion rows of data.

I've been working to build out the Kudu Python interface so that it's easier for Python users to use the project and participate in the development community. If you see something here that interests you, we'd love to have you involved.