Basic Table

Since Accumulo tables are sorted by row ID, each table can be thought of as being indexed by the row ID. Lookups performed by row ID can be executed quickly, by doing a binary search, first across the tablets, and then within a tablet. Clients should choose a row ID carefully in order to support their desired application. A simple rule is to select a unique identifier as the row ID for each entity to be stored and assign all the other attributes to be tracked to be columns under this row ID. For example, if we have the following data in a comma-separated file:

  1. userid,age,address,account-balance

We might choose to store this data using the userid as the rowID, the column name in the column family, and a blank column qualifier:

  1. Mutation m = new Mutation(userid);
  2. m.at().family("age").put(age);
  3. m.at().family("address").put(address);
  4. m.at().family("balance").put(account_balance);
  5. writer.add(m);

We could then retrieve any of the columns for a specific userid by specifying the userid as the range of a scanner and fetching specific columns:

  1. AccumuloClient client = Accumulo.newClient()
  2. .from("/path/to/accumulo-client.properties").build();
  3. Range r = new Range(userid, userid); // single row
  4. Scanner s = client.createScanner("userdata", auths);
  5. s.setRange(r);
  6. s.fetchColumnFamily("age");
  7. for (Entry<Key,Value> entry : s) {
  8. System.out.println(entry.getValue().toString());
  9. }

RowID Design

Often it is necessary to transform the rowID in order to have rows ordered in a way that is optimal for anticipated access patterns. A good example of this is reversing the order of components of internet domain names in order to group rows of the same parent domain together:

  1. com.google.code
  2. com.google.labs
  3. com.google.mail
  4. com.yahoo.mail
  5. com.yahoo.research

Some data may result in the creation of very large rows - rows with many columns. In this case the table designer may wish to split up these rows for better load balancing while keeping them sorted together for scanning purposes. This can be done by appending a random substring at the end of the row:

  1. com.google.code_00
  2. com.google.code_01
  3. com.google.code_02
  4. com.google.labs_00
  5. com.google.mail_00
  6. com.google.mail_01

It could also be done by adding a string representation of some period of time such as date to the week or month:

  1. com.google.code_201003
  2. com.google.code_201004
  3. com.google.code_201005
  4. com.google.labs_201003
  5. com.google.mail_201003
  6. com.google.mail_201004

Appending dates provides the additional capability of restricting a scan to a given date range.

Lexicoders

Since Keys in Accumulo are sorted lexicographically by default, it’s often useful to encode common data types into a byte format in which their sort order corresponds to the sort order in their native form. An example of this is encoding dates and numerical data so that they can perform better during a seek or range search.

The lexicoders are a standard and extensible way of encoding Java types. Here’s an example of a lexicoder that encodes a java Date object so that it sorts lexicographically:

  1. // create new date lexicoder
  2. DateLexicoder dateEncoder = new DateLexicoder();
  3. // truncate time to hours
  4. long epoch = System.currentTimeMillis();
  5. Date hour = new Date(epoch - (epoch % 3600000));
  6. // encode the rowId so that it is sorted lexicographically
  7. Mutation mutation = new Mutation(dateEncoder.encode(hour));
  8. mutation.at().family("colf").qualifier("colq").put(new byte[]{});

If we want to return the most recent date first, we can reverse the sort order with the reverse lexicoder:

  1. // create new date lexicoder and reverse lexicoder
  2. DateLexicoder dateEncoder = new DateLexicoder();
  3. ReverseLexicoder reverseEncoder = new ReverseLexicoder(dateEncoder);
  4. // truncate date to hours
  5. long epoch = System.currentTimeMillis();
  6. Date hour = new Date(epoch - (epoch % 3600000));
  7. // encode the rowId so that it sorts in reverse lexicographic order
  8. Mutation mutation = new Mutation(reverseEncoder.encode(hour));
  9. mutation.at().family("colf").qualifier("colq").put(new byte[]{});

Indexing

In order to support lookups via more than one attribute of an entity, additional indexes can be built. However, because Accumulo tables can support any number of columns without specifying them beforehand, a single additional index will often suffice for supporting lookups of records in the main table. Here, the index has, as the rowID, the Value or Term from the main table, the column families are the same, and the column qualifier of the index table contains the rowID from the main table.

RowID Column Family Column Qualifier Value
Term Field Name MainRowID

Note: We store rowIDs in the column qualifier rather than the Value so that we can have more than one rowID associated with a particular term within the index. If we stored this in the Value we would only see one of the rows in which the value appears since Accumulo is configured by default to return the one most recent value associated with a key.

Lookups can then be done by scanning the Index Table first for occurrences of the desired values in the columns specified, which returns a list of row ID from the main table. These can then be used to retrieve each matching record, in their entirety, or a subset of their columns, from the Main Table.

To support efficient lookups of multiple rowIDs from the same table, the Accumulo client library provides a BatchScanner. Users specify a set of Ranges to the BatchScanner, which performs the lookups in multiple threads to multiple servers and returns an Iterator over all the rows retrieved. The rows returned are NOT in sorted order, as is the case with the basic Scanner interface.

  1. HashSet<Range> matchingRows = new HashSet<Range>();
  2. // first we scan the index for IDs of rows matching our query
  3. try (Scanner indexScanner = client.createScanner("index", auths)) {
  4. indexScanner.setRange(Range.exact("mySearchTerm");
  5. // we retrieve the matching rowIDs and create a set of ranges
  6. for (Entry<Key,Value> entry : indexScanner) {
  7. matchingRows.add(new Range(entry.getKey().getColumnQualifier()));
  8. }
  9. }
  10. // now we pass the set of rowIDs to the batch scanner to retrieve them
  11. try (BatchScanner bscan = client.createBatchScanner("table", auths, 10)) {
  12. bscan.setRanges(matchingRows);
  13. bscan.fetchColumnFamily("attributes");
  14. for (Entry<Key,Value> entry : bscan) {
  15. System.out.println(entry.getValue());
  16. }
  17. }

One advantage of the dynamic schema capabilities of Accumulo is that different fields may be indexed into the same physical table. However, it may be necessary to create different index tables if the terms must be formatted differently in order to maintain proper sort order. For example, real numbers must be formatted differently than their usual notation in order to be sorted correctly. In these cases, usually one index per unique data type will suffice.

Entity-Attribute and Graph Tables

Accumulo is ideal for storing entities and their attributes, especially of the attributes are sparse. It is often useful to join several datasets together on common entities within the same table. This can allow for the representation of graphs, including nodes, their attributes, and connections to other nodes.

Rather than storing individual events, Entity-Attribute or Graph tables store aggregate information about the entities involved in the events and the relationships between entities. This is often preferable when single events aren’t very useful and when a continuously updated summarization is desired.

The physical schema for an entity-attribute or graph table is as follows:

RowID Column Family Column Qualifier Value
EntityID Attribute Name Attribute Value Weight
EntityID Edge Type Related EntityID Weight

For example, to keep track of employees, managers and products the following entity-attribute table could be used. Note that the weights are not always necessary and are set to 0 when not used.

RowID Column Family Column Qualifier Value
E001 name bob 0
E001 department sales 0
E001 hire_date 20030102 0
E001 units_sold P001 780
E002 name george 0
E002 department sales 0
E002 manager_of E001 0
E002 manager_of E003 0
E003 name harry 0
E003 department accounts_recv 0
E003 hire_date 20000405 0
E003 units_sold P002 566
E003 units_sold P001 232
P001 product_name nike_airs 0
P001 product_type shoe 0
P001 in_stock germany 900
P001 in_stock brazil 200
P002 product_name basic_jacket 0
P002 product_type clothing 0
P002 in_stock usa 3454
P002 in_stock germany 700

To allow efficient updating of edge weights, an aggregating iterator can be configured to add the value of all mutations applied with the same key. These types of tables can easily be created from raw events by simply extracting the entities, attributes, and relationships from individual events and inserting the keys into Accumulo each with a count of 1. The aggregating iterator will take care of maintaining the edge weights.

Document-Partitioned Indexing

Using a simple index as described above works well when looking for records that match one of a set of given criteria. When looking for records that match more than one criterion simultaneously, such as when looking for documents that contain all of the words ‘the’ and ‘white’ and ‘house’, there are several issues.

First is that the set of all records matching any one of the search terms must be sent to the client, which incurs a lot of network traffic. The second problem is that the client is responsible for performing set intersection on the sets of records returned to eliminate all but the records matching all search terms. The memory of the client may easily be overwhelmed during this operation.

For these reasons Accumulo includes support for a scheme known as sharded indexing, in which these set operations can be performed at the TabletServers and decisions about which records to include in the result set can be made without incurring network traffic.

This is accomplished via partitioning records into bins that each reside on at most one TabletServer, and then creating an index of terms per record within each bin as follows:

RowID Column Family Column Qualifier Value
BinID Term DocID Weight

Documents or records are mapped into bins by a user-defined ingest application. By storing the BinID as the RowID we ensure that all the information for a particular bin is contained in a single tablet and hosted on a single TabletServer since Accumulo never splits rows across tablets. Storing the Terms as column families serves to enable fast lookups of all the documents within this bin that contain the given term.

Finally, we perform set intersection operations on the TabletServer via a special iterator called the Intersecting Iterator. Since documents are partitioned into many bins, a search of all documents must search every bin. We can use the BatchScanner to scan all bins in parallel. The Intersecting Iterator should be enabled on a BatchScanner within user query code as follows:

  1. Text[] terms = {new Text("the"), new Text("white"), new Text("house")};
  2. try (BatchScanner bscan = client.createBatchScanner(table, auths, 20)) {
  3. IteratorSetting iter = new IteratorSetting(20, "ii", IntersectingIterator.class);
  4. IntersectingIterator.setColumnFamilies(iter, terms);
  5. bscan.addScanIterator(iter);
  6. bscan.setRanges(Collections.singleton(new Range()));
  7. for (Entry<Key,Value> entry : bscan) {
  8. System.out.println(" " + entry.getKey().getColumnQualifier());
  9. }
  10. }

This code effectively has the BatchScanner scan all tablets of a table, looking for documents that match all the given terms. Because all tablets are being scanned for every query, each query is more expensive than other Accumulo scans, which typically involve a small number of TabletServers. This reduces the number of concurrent queries supported and is subject to what is known as the ‘straggler’ problem in which every query runs as slow as the slowest server participating.

Of course, fast servers will return their results to the client which can display them to the user immediately while they wait for the rest of the results to arrive. If the results are unordered this is quite effective as the first results to arrive are as good as any others to the user.