Basic Table
Since Accumulo tables are sorted by row ID, each table can be thought of as being indexed by the row ID. Lookups performed by row ID can be executed quickly, by doing a binary search, first across the tablets, and then within a tablet. Clients should choose a row ID carefully in order to support their desired application. A simple rule is to select a unique identifier as the row ID for each entity to be stored and assign all the other attributes to be tracked to be columns under this row ID. For example, if we have the following data in a comma-separated file:
userid,age,address,account-balance
We might choose to store this data using the userid as the rowID, the column name in the column family, and a blank column qualifier:
Mutation m = new Mutation(userid);
m.at().family("age").put(age);
m.at().family("address").put(address);
m.at().family("balance").put(account_balance);
writer.add(m);
We could then retrieve any of the columns for a specific userid by specifying the userid as the range of a scanner and fetching specific columns:
AccumuloClient client = Accumulo.newClient()
.from("/path/to/accumulo-client.properties").build();
Range r = new Range(userid, userid); // single row
Scanner s = client.createScanner("userdata", auths);
s.setRange(r);
s.fetchColumnFamily("age");
for (Entry<Key,Value> entry : s) {
System.out.println(entry.getValue().toString());
}
RowID Design
Often it is necessary to transform the rowID in order to have rows ordered in a way that is optimal for anticipated access patterns. A good example of this is reversing the order of components of internet domain names in order to group rows of the same parent domain together:
com.google.code
com.google.labs
com.google.mail
com.yahoo.mail
com.yahoo.research
Some data may result in the creation of very large rows - rows with many columns. In this case the table designer may wish to split up these rows for better load balancing while keeping them sorted together for scanning purposes. This can be done by appending a random substring at the end of the row:
com.google.code_00
com.google.code_01
com.google.code_02
com.google.labs_00
com.google.mail_00
com.google.mail_01
It could also be done by adding a string representation of some period of time such as date to the week or month:
com.google.code_201003
com.google.code_201004
com.google.code_201005
com.google.labs_201003
com.google.mail_201003
com.google.mail_201004
Appending dates provides the additional capability of restricting a scan to a given date range.
Lexicoders
Since Keys in Accumulo are sorted lexicographically by default, it’s often useful to encode common data types into a byte format in which their sort order corresponds to the sort order in their native form. An example of this is encoding dates and numerical data so that they can perform better during a seek or range search.
The lexicoders are a standard and extensible way of encoding Java types. Here’s an example of a lexicoder that encodes a java Date object so that it sorts lexicographically:
// create new date lexicoder
DateLexicoder dateEncoder = new DateLexicoder();
// truncate time to hours
long epoch = System.currentTimeMillis();
Date hour = new Date(epoch - (epoch % 3600000));
// encode the rowId so that it is sorted lexicographically
Mutation mutation = new Mutation(dateEncoder.encode(hour));
mutation.at().family("colf").qualifier("colq").put(new byte[]{});
If we want to return the most recent date first, we can reverse the sort order with the reverse lexicoder:
// create new date lexicoder and reverse lexicoder
DateLexicoder dateEncoder = new DateLexicoder();
ReverseLexicoder reverseEncoder = new ReverseLexicoder(dateEncoder);
// truncate date to hours
long epoch = System.currentTimeMillis();
Date hour = new Date(epoch - (epoch % 3600000));
// encode the rowId so that it sorts in reverse lexicographic order
Mutation mutation = new Mutation(reverseEncoder.encode(hour));
mutation.at().family("colf").qualifier("colq").put(new byte[]{});
Indexing
In order to support lookups via more than one attribute of an entity, additional indexes can be built. However, because Accumulo tables can support any number of columns without specifying them beforehand, a single additional index will often suffice for supporting lookups of records in the main table. Here, the index has, as the rowID, the Value or Term from the main table, the column families are the same, and the column qualifier of the index table contains the rowID from the main table.
RowID | Column Family | Column Qualifier | Value |
---|---|---|---|
Term | Field Name | MainRowID |
Note: We store rowIDs in the column qualifier rather than the Value so that we can have more than one rowID associated with a particular term within the index. If we stored this in the Value we would only see one of the rows in which the value appears since Accumulo is configured by default to return the one most recent value associated with a key.
Lookups can then be done by scanning the Index Table first for occurrences of the desired values in the columns specified, which returns a list of row ID from the main table. These can then be used to retrieve each matching record, in their entirety, or a subset of their columns, from the Main Table.
To support efficient lookups of multiple rowIDs from the same table, the Accumulo client library provides a BatchScanner. Users specify a set of Ranges to the BatchScanner, which performs the lookups in multiple threads to multiple servers and returns an Iterator over all the rows retrieved. The rows returned are NOT in sorted order, as is the case with the basic Scanner interface.
HashSet<Range> matchingRows = new HashSet<Range>();
// first we scan the index for IDs of rows matching our query
try (Scanner indexScanner = client.createScanner("index", auths)) {
indexScanner.setRange(Range.exact("mySearchTerm");
// we retrieve the matching rowIDs and create a set of ranges
for (Entry<Key,Value> entry : indexScanner) {
matchingRows.add(new Range(entry.getKey().getColumnQualifier()));
}
}
// now we pass the set of rowIDs to the batch scanner to retrieve them
try (BatchScanner bscan = client.createBatchScanner("table", auths, 10)) {
bscan.setRanges(matchingRows);
bscan.fetchColumnFamily("attributes");
for (Entry<Key,Value> entry : bscan) {
System.out.println(entry.getValue());
}
}
One advantage of the dynamic schema capabilities of Accumulo is that different fields may be indexed into the same physical table. However, it may be necessary to create different index tables if the terms must be formatted differently in order to maintain proper sort order. For example, real numbers must be formatted differently than their usual notation in order to be sorted correctly. In these cases, usually one index per unique data type will suffice.
Entity-Attribute and Graph Tables
Accumulo is ideal for storing entities and their attributes, especially of the attributes are sparse. It is often useful to join several datasets together on common entities within the same table. This can allow for the representation of graphs, including nodes, their attributes, and connections to other nodes.
Rather than storing individual events, Entity-Attribute or Graph tables store aggregate information about the entities involved in the events and the relationships between entities. This is often preferable when single events aren’t very useful and when a continuously updated summarization is desired.
The physical schema for an entity-attribute or graph table is as follows:
RowID | Column Family | Column Qualifier | Value |
---|---|---|---|
EntityID | Attribute Name | Attribute Value | Weight |
EntityID | Edge Type | Related EntityID | Weight |
For example, to keep track of employees, managers and products the following entity-attribute table could be used. Note that the weights are not always necessary and are set to 0 when not used.
RowID | Column Family | Column Qualifier | Value |
---|---|---|---|
E001 | name | bob | 0 |
E001 | department | sales | 0 |
E001 | hire_date | 20030102 | 0 |
E001 | units_sold | P001 | 780 |
E002 | name | george | 0 |
E002 | department | sales | 0 |
E002 | manager_of | E001 | 0 |
E002 | manager_of | E003 | 0 |
E003 | name | harry | 0 |
E003 | department | accounts_recv | 0 |
E003 | hire_date | 20000405 | 0 |
E003 | units_sold | P002 | 566 |
E003 | units_sold | P001 | 232 |
P001 | product_name | nike_airs | 0 |
P001 | product_type | shoe | 0 |
P001 | in_stock | germany | 900 |
P001 | in_stock | brazil | 200 |
P002 | product_name | basic_jacket | 0 |
P002 | product_type | clothing | 0 |
P002 | in_stock | usa | 3454 |
P002 | in_stock | germany | 700 |
To allow efficient updating of edge weights, an aggregating iterator can be configured to add the value of all mutations applied with the same key. These types of tables can easily be created from raw events by simply extracting the entities, attributes, and relationships from individual events and inserting the keys into Accumulo each with a count of 1. The aggregating iterator will take care of maintaining the edge weights.
Document-Partitioned Indexing
Using a simple index as described above works well when looking for records that match one of a set of given criteria. When looking for records that match more than one criterion simultaneously, such as when looking for documents that contain all of the words ‘the’ and ‘white’ and ‘house’, there are several issues.
First is that the set of all records matching any one of the search terms must be sent to the client, which incurs a lot of network traffic. The second problem is that the client is responsible for performing set intersection on the sets of records returned to eliminate all but the records matching all search terms. The memory of the client may easily be overwhelmed during this operation.
For these reasons Accumulo includes support for a scheme known as sharded indexing, in which these set operations can be performed at the TabletServers and decisions about which records to include in the result set can be made without incurring network traffic.
This is accomplished via partitioning records into bins that each reside on at most one TabletServer, and then creating an index of terms per record within each bin as follows:
RowID | Column Family | Column Qualifier | Value |
---|---|---|---|
BinID | Term | DocID | Weight |
Documents or records are mapped into bins by a user-defined ingest application. By storing the BinID as the RowID we ensure that all the information for a particular bin is contained in a single tablet and hosted on a single TabletServer since Accumulo never splits rows across tablets. Storing the Terms as column families serves to enable fast lookups of all the documents within this bin that contain the given term.
Finally, we perform set intersection operations on the TabletServer via a special iterator called the Intersecting Iterator. Since documents are partitioned into many bins, a search of all documents must search every bin. We can use the BatchScanner to scan all bins in parallel. The Intersecting Iterator should be enabled on a BatchScanner within user query code as follows:
Text[] terms = {new Text("the"), new Text("white"), new Text("house")};
try (BatchScanner bscan = client.createBatchScanner(table, auths, 20)) {
IteratorSetting iter = new IteratorSetting(20, "ii", IntersectingIterator.class);
IntersectingIterator.setColumnFamilies(iter, terms);
bscan.addScanIterator(iter);
bscan.setRanges(Collections.singleton(new Range()));
for (Entry<Key,Value> entry : bscan) {
System.out.println(" " + entry.getKey().getColumnQualifier());
}
}
This code effectively has the BatchScanner scan all tablets of a table, looking for documents that match all the given terms. Because all tablets are being scanned for every query, each query is more expensive than other Accumulo scans, which typically involve a small number of TabletServers. This reduces the number of concurrent queries supported and is subject to what is known as the ‘straggler’ problem in which every query runs as slow as the slowest server participating.
Of course, fast servers will return their results to the client which can display them to the user immediately while they wait for the rest of the results to arrive. If the results are unordered this is quite effective as the first results to arrive are as good as any others to the user.