Accumulo SortedKeyValueIterators, commonly referred to as Iterators for short, are server-side programming constructs that allow users to implement custom retrieval or computational purpose within Accumulo TabletServers. The name rightly brings forward similarities to the Java Iterator interface; however, Accumulo Iterators are more complex than Java Iterators. Notably, in addition to the expected methods to retrieve the current element and advance to the next element in the iteration, Accumulo Iterators must also support the ability to “move” (seek) to a specified point in the iteration (the Accumulo table). Accumulo Iterators are designed to be concatenated together, similar to applying a series of transformations to a list of elements. Accumulo Iterators can duplicate their underlying source to create multiple “pointers” over the same underlying data (which is extremely powerful since each stream is sorted) or they can merge multiple Iterators into a single view. In this sense, a collection of Iterators operating in tandem is closer to a tree-structure than a list, but there is always a sense of a flow of Key-Value pairs through some Iterators. Iterators are not designed to act as triggers nor are they designed to operate outside of the purview of a single table.
This guide aims to provide a more detailed description of how Iterators are invoked, some best practices and some common pitfalls.
Instantiation
To invoke an Accumulo Iterator inside of the TabletServer, the Iterator class must be on the classpath of every TabletServer. It is common to place a JAR file which contains the Iterator in lib/. Accumulo references the Iterator class by name and uses Java reflection to instantiate the Iterator. This means that Iterators must have a public no-args constructor.
Interface
A normal implementation of the SortedKeyValueIterator defines functionality for the following methods:
void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException;
boolean hasTop();
void next() throws IOException;
void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException;
Key getTopKey();
Value getTopValue();
SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env);
init
The init method is called by the TabletServer after it constructs an instance of the Iterator. This method should clear/reset any internal state in the Iterator and prepare it to process data. The first argument, the source, is the Iterator “below” this Iterator (where the client is at “top” and the Iterator for files in HDFS are at the “bottom”). The “source” Iterator provides the Key-Value pairs which this Iterator will operate upon.
The second argument, a Map of options, is made up of options provided by the user, options set in the table’s configuration, and/or options set in the containing namespace’s configuration. These options allow for Iterators to dynamically configure themselves on the fly. If no options are used in the current context (a Scan or Compaction), the Map will be empty. An example of a configuration item for an Iterator could be a pattern used to filter Key-Value pairs in a regular expression Iterator.
The third argument, the IteratorEnvironment, is a special object which provides information to this Iterator about the context in which it was invoked. Commonly, this information is not necessary to inspect. For example, if an Iterator knows that it is running in the context of a full-major compaction (reading all of the data) as opposed to a user scan (which may strongly limit the number of columns), the Iterator might make different algorithmic decisions in an attempt to optimize itself.
seek
The purpose of the seek method is to advance the stream of Key-Value pairs to a certain point in the iteration (the Accumulo table). It is common that before the implementation of this method returns some additional processing is performed which may further advance the current position past the startKey of the Range. This, however, is dependent on the functionality the iterator provides. For example, a filtering iterator would consume a number Key-Value pairs which do not meet its criteria before seek returns. The important condition for seek to meet is that this Iterator should be ready to return the first Key-Value pair, or none if no such pair is available, when the method returns. The Key-Value pair would be returned by getTopKey and getTopValue, respectively, and hasTop should return a boolean denoting whether or not there is a Key-Value pair to return.
The arguments passed to seek are as follows:
The TabletServer first provides a Range, an object which defines some collection of Accumulo Keys, which defines the Key-Value pairs that this Iterator should return. Each Range has a startKey and endKey with an inclusive flag for both. While this Range is often similar to the Range(s) set by the client on a Scanner or BatchScanner, it is not guaranteed to be a Range that the client set. Accumulo will split up larger ranges and group them together based on Tablet boundaries per TabletServer. Iterators should not attempt to implement any custom logic based on the Range(s) provided to seek and Iterators should not return any Keys that fall outside of the provided Range.
The second argument, a Collection<ByteSequence>, is the set of column families which should be retained or excluded by this Iterator. The third argument, a boolean, defines whether the collection of column families should be treated as an inclusion collection (true) or an exclusion collection (false).
It is likely that all implementations of seek will first make a call to the seek method on the “source” Iterator that was provided in the init method. The collection of column families and the boolean include argument should be passed down as well as the Range. Somewhat commonly, the Iterator will also implement some sort of additional logic to find or compute the first Key-Value pair in the provided Range. For example, a regular expression Iterator would consume all records which do not match the given pattern before returning from seek.
It is important to retain the original Range passed to this method to know when this Iterator should stop reading more Key-Value pairs. Ignoring this typically does not affect scans from a Scanner, but it will result in duplicate keys emitting from a BatchScanner if the scanned table has more than one tablet. Best practice is to never emit entries outside the seek range.
next
The next method is analogous to the next method on a Java Iterator: this method should advance the Iterator to the next Key-Value pair. For implementations that perform some filtering or complex logic, this may result in more than one Key-Value pair being inspected. This method alters some internal state that is exposed via the hasTop, getTopKey, and getTopValue methods.
The result of this method is commonly caching a Key-Value pair which getTopKey and getTopValue can later return. While there is another Key-Value pair to return, hasTop should return true. If there are no more Key-Value pairs to return from this Iterator since the last call to seek, hasTop should return false.
hasTop
The hasTop method is similar to the hasNext method on a Java Iterator in that it informs the caller if there is a Key-Value pair to be returned. If there is no pair to return, this method should return false. Like a Java Iterator, multiple calls to hasTop (without calling next) should not alter the internal state of the Iterator.
getTopKey and getTopValue
These methods simply return the current Key-Value pair for this iterator. If hasTop returns true, both of these methods should return non-null objects. If hasTop returns false, it is undefined what these methods should return. Like hasTop, multiple calls to these methods should not alter the state of the Iterator.
Users should take caution when either
- caching the Key/Value from getTopKey/getTopValue, for use after callingnexton the source iterator. In this case, the cached Key/Value object is aliased to the reference returned by the source iterator. Iterators may reuse the same Key/Value object in anextcall for performance reasons, changing the data that the cached Key/Value object references and resulting in a logic bug.
- modifying the Key/Value from getTopKey/getTopValue. If the source iterator reuses data stored in the Key/Value, then the source iterator may use the modified data that the Key/Value references. This may/may not result in a logic bug.
In both cases, copying the Key/Value’s data into a new object ensures iterator correctness. If neither case applies, it is safe to not copy the Key/Value. The general guideline is to be aware of who else may use Key/Value objects returned from getTopKey/getTopValue.
deepCopy
The deepCopy method is similar to the clone method from the Java Cloneable interface. Implementations of this method should return a new object of the same type as the Accumulo Iterator instance it was called on. Any internal state from the instance deepCopy was called on should be carried over to the returned copy. The returned copy should be ready to have seek called on it. The SortedKeyValueIterator interface guarantees that init will be called on an iterator before deepCopy and that init will not be called on the iterator returned by deepCopy.
Typically, implementations of deepCopy call a copy-constructor which will initialize internal data structures. As with seek, it is common for the IteratorEnvironment argument to be ignored as most Iterator implementations can be written without the explicit information the environment provides.
In the analogy of a series of Iterators representing a tree, deepCopy can be thought of as early programming assignments which implement their own tree data structures. deepCopy calls copy on its sources (the children), copies itself, attaches the copies of the children, and then returns itself.
Yielding Interface
If you have implemented an iterator with a next or seek call that can take a very long time resulting in starving out other scans within the same thread pool, try implementing the optional YieldingKeyValueIterator interface which SortedKeyValueIterator extends.
default void enableYielding(YieldCallback callback) { }
enableYielding
The implementation of this method should simply cache the supplied callback as a member of the iterator. Then one can call the yield(Key key) method on the callback within a next or seek call when the iterator is to yield control. The supplied key will be used as the start key in a follow-on seek call’s range allowing the iterator to continue where it left off. Note when an iterator yields, the hasTop() method must return false. Also note that the enableYielding method will not be called in isolation mode.
TabletServer invocation of Iterators
The following code is a general outline for how TabletServers invoke Iterators.
List<KeyValue> batch;
Range range = getRangeFromClient();
while (!overSizeLimit(batch)) {
SortedKeyValueIterator source = getSystemIterator();
for (String clzName : getUserIterators()) {
Class<?> clz = Class.forName(clzName);
SortedKeyValueIterator iter = (SortedKeyValueIterator) clz.newInstance();
iter.init(source, opts, env);
source = iter;
}
// read a batch of data to return to client from
// the last iterator, the "top"
SortedKeyValueIterator topIter = source;
YieldCallback cb = new YieldCallback();
topIter.enableYielding(cb)
topIter.seek(range, ...)
while (topIter.hasTop() && !overSizeLimit(batch)) {
key = topIter.getTopKey()
val = topIter.getTopValue()
batch.add(new KeyValue(key, val)
// remember the last key returned
setLastKeyReturned(key);
if (systemDataSourcesChanged()) {
// code does not show isolation case, which will
// keep using same data sources until a row boundary is hit
range = new Range(key, false, range.endKey(), range.endKeyInclusive());
break;
}
topIter.next()
}
if (cb.hasYielded()) {
// remember the yield key as the last key returned
setLastKeyReturned(cb.getKey());
break;
}
}
//return batch of key values to client
Additionally, the obtuse “re-seek” case can be outlined as the following:
// Given the above
List<KeyValue> batch = getNextBatch();
// thread goes away (client stops asking for the next batch).
// Eventually client comes back
// Setup as before...
Range userRange = getRangeFromClient();
Range actualRange = new Range(getLastKeyReturned(), false, userRange.getEndKey(), userRange.isEndKeyInclusive());
// Use the actualRange, not the user provided one
topIter.seek(actualRange);
Isolation
Accumulo provides a feature which clients can enable to prevent the viewing of partially applied mutations within the context of rows. If a client is submitting multiple column updates to rows at a time, isolation would ensure that a client would either see all of updates made to that row or none of the updates (until they are all applied).
When using Isolation, there are additional concerns in iterator design. A scan time iterator in accumulo reads from a set of data sources. While an iterator is reading data it has an isolated view. However, after it returns a key/value it is possible that accumulo may switch data sources and re-seek the iterator. This is done so that resources may be reclaimed. When the user does not request isolation this can occur after any key is returned. When a user enables Isolation, this will only occur after a new row is returned, in which case it will re-seek to the very beginning of the next possible row.
Abstract Iterators
A number of Abstract implementations of Iterators are provided to allow for faster creation of common patterns. The most commonly used abstract implementations are the Filter and Combiner classes. When possible these classes should be used instead as they have been thoroughly tested inside Accumulo itself.
Filter
The Filter abstract Iterator provides a very simple implementation which allows implementations to define whether or not a Key-Value pair should be returned via an accept(Key, Value) method.
Filters are extremely simple to implement; however, when the implementation is filtering a large percentage of Key-Value pairs with respect to the total number of pairs examined, it can be very inefficient. For example, if a Filter implementation can determine after examining part of the row that no other pairs in this row will be accepted, there is no mechanism to efficiently skip the remaining Key-Value pairs. Concretely, take a row which is comprised of 1000 Key-Value pairs. After examining the first 10 Key-Value pairs, it is determined that no other Key-Value pairs in this row will be accepted. The Filter must still examine each remaining 990 Key-Value pairs in this row. Another way to express this deficiency is that Filters have no means to leverage the seek method to efficiently skip large portions of Key-Value pairs.
As such, the Filter class functions well for filtering small amounts of data, but is inefficient for filtering large amounts of data. The decision to use a Filter strongly depends on the use case and distribution of data being filtered.
Combiner
The Combiner class is another common abstract Iterator. Similar to the Combiner interface define in Hadoop’s MapReduce framework, implementations of this abstract class reduce multiple Values for different versions of a Key (Keys which only differ by timestamps) into one Key-Value pair. Combiners provide a simple way to implement common operations like summation and aggregation without the need to implement the entire Accumulo Iterator interface.
One important consideration when choosing to design a Combiner is that the “reduction” operation is often best represented when it is associative and commutative. Operations which do not meet these criteria can be implemented; however, the implementation can be difficult.
A second consideration is that a Combiner is not guaranteed to see every Key-Value pair which differ only by timestamp every time it is invoked. For example, if there are 5 Key-Value pairs in a table which only differ by the timestamps 1, 2, 3, 4, and 5, it is not guaranteed that every invocation of the Combiner will see 5 timestamps. One invocation might see the Values for Keys with timestamp 1 and 4, while another invocation might see the Values for Keys with the timestamps 1, 2, 4 and 5.
Finally, when configuring an Accumulo table to use a Combiner, be sure to disable the Versioning Iterator or set the Combiner at a priority less than the Combiner (the Versioning Iterator is added at a priority of 20 by default). The Versioning Iterator will filter out multiple Key-Value pairs that differ only by timestamp and return only the Key-Value pair that has the largest timestamp.
Combiner Applications
Many applications can benefit from the ability to aggregate values across common keys. This can be done via Combiner iterators and is similar to the Reduce step in MapReduce. This provides the ability to define online, incrementally updated analytics without the overhead or latency associated with batch-oriented MapReduce jobs.
All that is needed to aggregate values of a table is to identify the fields over which values will be grouped, insert mutations with those fields as the key, and configure the table with a combining iterator that supports the summarizing operation desired.
The only restriction on a combining iterator is that the combiner developer should not assume that all values for a given key have been seen, since new mutations can be inserted at anytime. This precludes using the total number of values in the aggregation such as when calculating an average, for example.
An interesting use of combining iterators within an Accumulo table is to store feature vectors for use in machine learning algorithms. For example, many algorithms such as k-means clustering, support vector machines, anomaly detection, etc. use the concept of a feature vector and the calculation of distance metrics to learn a particular model. The columns in an Accumulo table can be used to efficiently store sparse features and their weights to be incrementally updated via the use of a combining iterator.
Best practices
Because of the flexibility that the SortedKeyValueIterator interface provides, it doesn’t directly disallow many implementations which are poor design decisions. The following are some common recommendations to follow and pitfalls to avoid in Iterator implementations.
Avoid special logic encoded in Ranges
Commonly, granular Ranges that a client passes to an Iterator from a Scanner or BatchScanner are unmodified. If a Range falls within the boundaries of a Tablet, an Iterator will often see that same Range in the seek method. However, there is no guarantee that the Range will remain unaltered from client to server. As such, Iterators should never make assumptions about the current state/context based on the Range.
The common failure condition is referred to as a “re-seek”. In the context of a Scan, TabletServers construct the “stack” of Iterators and batch up Key-Value pairs to send back to the client. When a sufficient number of Key-Value pairs are collected, it is common for the Iterators to be “torn down” until the client asks for the next batch of Key-Value pairs. This is done by the TabletServer to add fairness in ensuring one Scan does not monopolize the available resources. When the client asks for the next batch, the implementation modifies the original Range so that servers know the point to resume the iteration (to avoid returning duplicate Key-Value pairs). Specifically, the new Range is created from the original but is shortened by setting the startKey of the original Range to the Key last returned by the Scan, non-inclusive.
seeking backwards
The ability for an Iterator to “skip over” large blocks of Key-Value pairs is a major tenet behind Iterators. By seek‘ing when it is known that there is a collection of Key-Value pairs which can be ignored can greatly increase the speed of a scan as many Key-Value pairs do not have to be deserialized and processed.
While the seek method provides the Range that should be used to seek the underlying source Iterator, there is no guarantee that the implementing Iterator uses that Range to perform the seek on its “source” Iterator. As such, it is possible to seek to any Range and the interface has no assertions to prevent this from happening.
Since Iterators are allowed to seek to arbitrary Keys, it also allows Iterators to create infinite loops inside Scans that will repeatedly read the same data without end. If an arbitrary Range is constructed, it should construct a completely new Range as it allows for bugs to be introduced which will break Accumulo.
Thus, seek’s should always be thought of as making “forward progress” in the view of the total iteration. The startKey of a Range should always be greater than the current Key seen by the Iterator while the endKey of the Range should always retain the original endKey (and endKey inclusivity) of the last Range seen by your Iterator’s implementation of seek.
Take caution in constructing new data in an Iterator
Implementations of Iterator might be tempted to open BatchWriters inside of an Iterator as a means to implement triggers for writing additional data outside of their client application. The lifecycle of an Iterator is not managed in such a way that guarantees that this is safe nor efficient. Specifically, there is no way to guarantee that the internal ThreadPool inside of the BatchWriter is closed (and the thread(s) are reaped) without calling the close() method. close‘ing and recreating a BatchWriter after every Key-Value pair is also prohibitively performance limiting to be considered an option.
The only safe way to generate additional data in an Iterator is to alter the current Key-Value pair. For example, the WholeRowIterator serializes the all of the Key-Values pairs that fall within each row. A safe way to generate more data in an Iterator would be to construct an Iterator that is “higher” (at a larger priority) than the WholeRowIterator, that is, the Iterator receives the Key-Value pairs which are a serialization of many Key-Value pairs. The custom Iterator could deserialize the pairs, compute some function, and add a new Key-Value pair to the original collection, re-serializing the collection of Key-Value pairs back into a single Key-Value pair.
Any other situation is likely not guaranteed to ensure that the caller (a Scan or a Compaction) will always see all intended data that is generated.
Final things to remember
Some simple recommendations/points to keep in mind:
Method call order
On an instance of an Iterator: init is always called before seek, seek is always called before hasTop, getTopKey and getTopValue will not be called if hasTop returns false.
Teardown
Instances of iterators may be torn down inside the server transparently. When a complex collection of iterators is performing advanced functionality, they will not be torn down until a Key-Value pair is returned out of the “stack” of iterators (and added into the batch of Key-Values to be returned to the caller), or the iterator is yielded.
When an iterator is torn down, the entire stack is dropped and no state is preserved. Only the last key returned (or the yielded position), original options, and seek range are retained. When the scan is continued, the iterator stack is rebuilt and re-initialized using the original options. The stack is then seeked with the original range, and the start key is replaced by the last key returned (or the yielded position), non-inclusive.
Compaction-time Iterators
When Iterators are configured to run during compactions, at the minc or majc scope, these Iterators sometimes need to make different assertions than those who only operate at scan time. Iterators won’t see the delete entries; however, Iterators will not necessarily see all of the Key-Value pairs in ever invocation. Because compactions often do not rewrite all files (only a subset of them), it is possible that the logic take this into consideration.
For example, a Combiner that runs over data at during compactions, might not see all of the values for a given Key. The Combiner must recognize this and not perform any function that would be incorrect due to the missing values.
Testing
The Iterator test harness is generalized testing framework for Accumulo Iterators that can identify common pitfalls in user-created Iterators.
 我的书签
 我的书签
                                 添加书签
 添加书签 移除书签
 移除书签