- 7.1. Map
- 7.2. Multimap
- 7.3. Set
- 7.4. SortedSet
- 7.5. ScoredSortedSet
- 7.6. LexSortedSet
- 7.7. List
- 7.8. Queue
- 7.9. Deque
- 7.10. Blocking Queue
- 7.11. Bounded Blocking Queue
- 7.12. Blocking Deque
- 7.13. Blocking Fair Queue
- 7.14. Blocking Fair Deque
- 7.15. Delayed Queue
- 7.16. Priority Queue
- 7.17. Priority Deque
- 7.18. Priority Blocking Queue
- 7.19. Priority Blocking Deque
- 7.20. Stream
- 7.21. Ring Buffer
- 7.22. Transfer Queue
- 7.23. Time Series
7.1. Map
Redis based distributed Map object for Java implements ConcurrentMap interface. This object is fully thread-safe. Consider to use Live Object service to store POJO object as Redis Map. Redis uses serialized state to check key uniqueness instead of key’s hashCode()
/equals()
methods.
It has Async, Reactive and RxJava3 interfaces.
If Map used mostly for read operations and/or network roundtrips are undesirable use Map with Local cache support.
RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
// use fast* methods when previous value is not required
map.fastPut("a", new SomeObject());
map.fastPutIfAbsent("d", new SomeObject());
map.fastRemove("b");
RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");
RMap object allows to bind a Lock/ReadWriteLock/Semaphore/CountDownLatch object per key:
RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
MyValue v = map.get(k);
// process value ...
} finally {
keyLock.unlock();
}
RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {
MyValue v = map.get(k);
// process value ...
} finally {
keyLock.readLock().unlock();
}
7.1.1. Map eviction, local cache and data partitioning
Redisson provides various Map structure implementations with three important features:
local cache - so called near cache
used to speed up read operations and avoid network roundtrips. It caches Map entries on Redisson side and executes read operations up to 45x faster in comparison with common implementation. Local cache instances with the same name connected to the same pub/sub channel. This channel is used for exchanging of update/invalidate events between all instances. Local cache store doesn’t use hashCode()
/equals()
methods of key object, instead it uses hash of serialized state.
data partitioning - although Map object is cluster compatible its content isn’t scaled/partitioned across multiple Redis master nodes in cluster. Data partitioning allows to scale available memory, read/write operations and entry eviction process for individual Map instance in Redis cluster.
lua entry eviction - allows to define time to live
or max idle time
parameters per map entry. Redis hash structure doesn’t support eviction thus it’s done on Redisson side through a custom scheduled task which removes expired entries using lua script. Eviction task is started once by getMapCache() method execution per unique object name. So even if instance isn’t used and has expired entries it should be get through getMapCache() method to start the eviction process. This leads to extra Redis calls and eviction task per unique map object name.
advanced entry eviction - allows to define time to live
parameter per map entry. Doesn’t use an entry eviction task.
native entry eviction - allows to define time to live
parameter per map entry. Doesn’t use an entry eviction task. Requires Redis 7.4+.
Below is the list of all available Map implementations:
RedissonClient method name |
Local cache |
Data partitioning |
Entry eviction |
Ultra-fast read/write |
---|---|---|---|---|
getMap() open-source version |
❌ | ❌ | ❌ | ❌ |
getMapCache() open-source version |
❌ | ❌ | lua | ❌ |
getMapCacheNative() open-source version |
❌ | ❌ | native | ❌ |
getLocalCachedMap() open-source version |
✔️ | ❌ | ❌ | ❌ |
getMap() Redisson PRO version |
❌ | ❌ | ❌ | ✔️ |
getMapCache() Redisson PRO version |
❌ | ❌ | lua | ✔️ |
getMapCacheNative() Redisson PRO version |
❌ | ❌ | native | ✔️ |
getMapCacheV2() available only in Redisson PRO |
❌ | ❌ | advanced | ✔️ |
getLocalCachedMap() Redisson PRO version |
✔️ | ❌ | ❌ | ✔️ |
getLocalCachedMapCache() available only in Redisson PRO |
✔️ | ❌ | lua | ✔️ |
getLocalCachedMapCacheV2() available only in Redisson PRO |
✔️ | ✔️ | advanced | ✔️ |
getClusteredMap() available only in Redisson PRO |
❌ | ✔️ | ❌ | ✔️ |
getClusteredLocalCachedMap() available only in Redisson PRO |
✔️ | ✔️ | ❌ | ✔️ |
getClusteredMapCache() available only in Redisson PRO |
❌ | ✔️ | lua | ✔️ |
getClusteredLocalCachedMapCache() available only in Redisson PRO |
✔️ | ✔️ | lua | ✔️ |
Redisson also provides Spring Cache and JCache implementations.
Eviction
Map object with eviction support implements org.redisson.api.RMapCache interface and extends java.util.concurrent.ConcurrentMap
interface. It also has Async, Reactive and RxJava3 interfaces.
Current Redis implementation doesn’t have map entry eviction functionality. Therefore expired entries are cleaned time to time by org.redisson.eviction.EvictionScheduler
. It removes 100 expired entries at once. Task launch time tuned automatically and depends on expired entries amount deleted in previous time and varies between 5 second to half an hour. Thus if clean task deletes 100 entries each time it will be executed every 5 seconds (minimum execution delay). But if current expired entries amount is lower than previous one then execution delay will be increased by 1.5 times and decreased otherwise.
It’s recommended to use single instance of RMapCache
object with the same name for each Redisson client instance.
Code example:
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap", MapCacheOptions.defaults());
// or
RMapCacheV2<String, SomeObject> map = redisson.getMapCacheV2("anyMap");
// or
RMapCacheV2<String, SomeObject> map = redisson.getMapCacheV2("anyMap", MapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap", MapCacheOptions.defaults());
// ttl = 10 minutes,
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// ttl = 3 seconds
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
// if object is not used anymore
map.destroy();
Local cache
Map object with local cache support implements org.redisson.api.RLocalCachedMap which extends java.util.concurrent.ConcurrentMap interface. This object is fully thread-safe.
It’s recommended to use single instance of LocalCachedMap
instance per name for each Redisson client instance. Same LocalCachedMapOptions
object should be used across all instances with the same name.
Follow options could be supplied during object creation:
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
// Defines whether to store a cache miss into the local cache.
// Default value is false.
.storeCacheMiss(false);
// Defines store mode of cache data.
// Follow options are available:
// LOCALCACHE - store data in local cache only and use Redis only for data update/invalidation.
// LOCALCACHE_REDIS - store data in both Redis and local cache.
.storeMode(StoreMode.LOCALCACHE_REDIS)
// Defines Cache provider used as local cache store.
// Follow options are available:
// REDISSON - uses Redisson own implementation
// CAFFEINE - uses Caffeine implementation
.cacheProvider(CacheProvider.REDISSON)
// Defines local cache eviction policy.
// Follow options are available:
// LFU - Counts how often an item was requested. Those that are used least often are discarded first.
// LRU - Discards the least recently used items first
// SOFT - Uses soft references, entries are removed by GC
// WEAK - Uses weak references, entries are removed by GC
// NONE - No eviction
.evictionPolicy(EvictionPolicy.NONE)
// If cache size is 0 then local cache is unbounded.
.cacheSize(1000)
// Defines strategy for load missed local cache updates after Redis connection failure.
//
// Follow reconnection strategies are available:
// CLEAR - Clear local cache if map instance has been disconnected for a while.
// LOAD - Store invalidated entry hash in invalidation log for 10 minutes
// Cache keys for stored invalidated entry hashes will be removed
// if LocalCachedMap instance has been disconnected less than 10 minutes
// or whole cache will be cleaned otherwise.
// NONE - Default. No reconnection handling
.reconnectionStrategy(ReconnectionStrategy.NONE)
// Defines local cache synchronization strategy.
//
// Follow sync strategies are available:
// INVALIDATE - Default. Invalidate cache entry across all LocalCachedMap instances on map entry change
// UPDATE - Insert/update cache entry across all LocalCachedMap instances on map entry change
// NONE - No synchronizations on map changes
.syncStrategy(SyncStrategy.INVALIDATE)
// time to live for each map entry in local cache
.timeToLive(10000)
// or
.timeToLive(10, TimeUnit.SECONDS)
// max idle time for each map entry in local cache
.maxIdle(10000)
// or
.maxIdle(10, TimeUnit.SECONDS);
Code example:
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
// or
RLocalCachedMap<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapCacheOptions.defaults());
// or
RLocalCachedMap<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapCacheOptions.defaults());
// or
RLocalCachedMap<String, SomeObject> map = redisson.getClusteredLocalCachedMap("anyMap", LocalCachedMapOptions.defaults());
String prevObject = map.put("123", 1);
String currentObject = map.putIfAbsent("323", 2);
String obj = map.remove("123");
// use fast* methods when previous value is not required
map.fastPut("a", 1);
map.fastPutIfAbsent("d", 32);
map.fastRemove("b");
RFuture<String> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");
Object should be destroyed if it not used anymore, but it’s not necessary to call destroy method if Redisson goes shutdown.
RLocalCachedMap<String, Integer> map = ...
map.destroy();
How to load data to avoid invalidation messages traffic.
Code example:
public void loadData(String cacheName, Map<String, String> data) {
RLocalCachedMap<String, String> clearMap = redisson.getLocalCachedMap(cacheName,
LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.INVALIDATE));
RLocalCachedMap<String, String> loadMap = redisson.getLocalCachedMap(cacheName,
LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.NONE));
loadMap.putAll(data);
clearMap.clearLocalCache();
}
Data partitioning
Map object with data partitioning support implements org.redisson.api.RClusteredMap
which extends java.util.concurrent.ConcurrentMap
interface. Read more details about data partitioning here).
Code example:
RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");
// or
RClusteredMap<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapCacheOptions.defaults());
// or
RClusteredMap<String, SomeObject> map = redisson.getClusteredLocalCachedMap("anyMap", LocalCachedMapOptions.defaults());
// or
RClusteredMap<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
map.fastPut("321", new SomeObject());
map.fastRemove("321");
7.1.2. Map persistence
Redisson allows to store Map data in external storage along with Redis store.
Use cases:
- Redisson Map object as a cache between an application and external storage.
- Increase durability of Redisson Map data and life-span of evicted entries.
- Caching for databases, web services or any other data source.
Read-through strategy
If requested entry doesn’t exist in the Redisson Map object when it will be loaded using provided MapLoader object. Code example:
MapLoader<String, String> mapLoader = new MapLoader<String, String>() {
@Override
public Iterable<String> loadAllKeys() {
List<String> list = new ArrayList<String>();
Statement statement = conn.createStatement();
try {
ResultSet result = statement.executeQuery("SELECT id FROM student");
while (result.next()) {
list.add(result.getString(1));
}
} finally {
statement.close();
}
return list;
}
@Override
public String load(String key) {
PreparedStatement preparedStatement = conn.prepareStatement("SELECT name FROM student where id = ?");
try {
preparedStatement.setString(1, key);
ResultSet result = preparedStatement.executeQuery();
if (result.next()) {
return result.getString(1);
}
return null;
} finally {
preparedStatement.close();
}
}
};
Configuration example:
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.loader(mapLoader);
MapCacheOptions<K, V> mcoptions = MapCacheOptions.<K, V>defaults()
.loader(mapLoader);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", mcoptions);
// or with performance boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with performance boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", mcoptions);
Write-through (synchronous) strategy
When the Map entry is being updated method won’t return until Redisson update it in an external storage using MapWriter object. Code example:
MapWriter<String, String> mapWriter = new MapWriter<String, String>() {
@Override
public void write(Map<String, String> map) {
PreparedStatement preparedStatement = conn.prepareStatement("INSERT INTO student (id, name) values (?, ?)");
try {
for (Entry<String, String> entry : map.entrySet()) {
preparedStatement.setString(1, entry.getKey());
preparedStatement.setString(2, entry.getValue());
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} finally {
preparedStatement.close();
}
}
@Override
public void delete(Collection<String> keys) {
PreparedStatement preparedStatement = conn.prepareStatement("DELETE FROM student where id = ?");
try {
for (String key : keys) {
preparedStatement.setString(1, key);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} finally {
preparedStatement.close();
}
}
};
Configuration example:
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_THROUGH);
MapCacheOptions<K, V> mcoptions = MapCacheOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_THROUGH);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", mcoptions);
// or with performance boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with performance boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", mcoptions);
Write-behind (asynchronous) strategy
Updates of Map object are accumulated in batches and asynchronously written with defined delay to external storage through MapWriter object.writeBehindDelay
- delay of batched write or delete operation. Default value is 1000 milliseconds.
writeBehindBatchSize
- size of batch. Each batch contains Map Entry write or delete commands. Default value is 50.
Configuration example:
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_BEHIND)
.writeBehindDelay(5000)
.writeBehindBatchSize(100);
MapCacheOptions<K, V> mcoptions = MapCacheOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_BEHIND)
.writeBehindDelay(5000)
.writeBehindBatchSize(100);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", mcoptions);
// or with performance boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with performance boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", mcoptions);
This feature available for RMap
, RMapCache
, RLocalCachedMap
and RLocalCachedMapCache
objects.
Usage of RLocalCachedMap
and RLocalCachedMapCache
objects boost Redis read-operations up to 45x times and give almost instant speed for database, web service or any other data source.
7.1.3. Map listeners
Redisson allows to bind listeners per RMap
object.
RMap
object allows to track follow events over the data.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Entry created/removed/updated after read operation |
org.redisson.api.listener.MapPutListener | Entry created/updated |
org.redisson.api.listener.MapRemoveListener | Entry removed |
org.redisson.api.ExpiredObjectListener | RMap object expired |
org.redisson.api.DeletedObjectListener | RMap object deleted |
Usage examples:
RMap<String, SomeObject> map = redisson.getMap("anyMap");
int listenerId = map.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
int listenerId = map.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
// ...
}
});
int listenerId = map.addListener(new MapPutListener() {
@Override
public void onPut(String name) {
// ...
}
});
int listenerId = map.addListener(new MapRemoveListener() {
@Override
public void onRemove(String name) {
// ...
}
});
map.removeListener(listenerId);
RMapCache
object allows to track additional events over the data.
Listener class name | Event description |
---|---|
org.redisson.api.map.event.EntryCreatedListener | Entry created |
org.redisson.api.map.event.EntryExpiredListener | Entry expired |
org.redisson.api.map.event.EntryRemovedListener | Entry removed |
org.redisson.api.map.event.EntryUpdatedListener | Entry updated |
Usage examples:
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache(LocalCachedMapCacheOptions.name("anyMap"));
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
int listenerId = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
@Override
public void onUpdated(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // new value
event.getOldValue() // old value
// ...
}
});
int listenerId = map.addListener(new EntryCreatedListener<Integer, Integer>() {
@Override
public void onCreated(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
int listenerId = map.addListener(new EntryExpiredListener<Integer, Integer>() {
@Override
public void onExpired(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
int listenerId = map.addListener(new EntryRemovedListener<Integer, Integer>() {
@Override
public void onRemoved(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
map.removeListener(listenerId);
7.1.4. LRU/LFU bounded Map
Map object which implements RMapCache
interface could be bounded using Least Recently Used (LRU) or Least Frequently Used (LFU) order. Bounded Map allows to store map entries within defined limit and retire entries in defined order.
Use cases: limited Redis memory.
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap", MapCacheOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap", MapCacheOptions.defaults());
// tries to set limit map to 10 entries using LRU eviction algorithm
map.trySetMaxSize(10);
// ... using LFU eviction algorithm
map.trySetMaxSize(10, EvictionMode.LFU);
// set or change limit map to 10 entries using LRU eviction algorithm
map.setMaxSize(10);
// ... using LFU eviction algorithm
map.setMaxSize(10, EvictionMode.LFU);
map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);
7.2. Multimap
Redis based Multimap for Java allows to bind multiple values per key. This object is fully thread-safe. Keys amount limited by Redis to 4 294 967 295
elements. Redis uses serialized state to check key uniqueness instead of key’s hashCode()
/equals()
methods.
It has Async, Reactive and RxJava3 interfaces.
7.2.1. Set based Multimap
Set based Multimap doesn’t allow duplications for values per key.
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
7.2.2. List based Multimap
List based Multimap object for Java stores entries in insertion order and allows duplicates for values mapped to key.
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> allValues = map.get(new SimpleKey("0"));
Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
7.2.3. Multimap eviction
Multimap object for Java with eviction support implemented by separated MultimapCache object. There are RSetMultimapCache and RListMultimapCache objects for Set and List Multimaps respectivly.
Expired entries are cleaned by org.redisson.EvictionScheduler
. It removes 100 expired entries at once. Task launch time tuned automatically and depends on expired entries amount deleted in previous time and varies between 1 second to 2 hours. Thus if clean task deletes 100 entries each time it will be executed every second (minimum execution delay). But if current expired entries amount is lower than previous one then execution delay will be increased by 1.5 times.
RSetMultimapCache example:
RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");
multimap.put("2", "e");
multimap.put("2", "f");
multimap.expireKey("2", 10, TimeUnit.MINUTES);
7.3. Set
Redis based Set object for Java implements java.util.Set
interface. This object is fully thread-safe. Keeps elements uniqueness via element state comparison. Set size limited by Redis to 4 294 967 295
elements. Redis uses serialized state to check value uniqueness instead of value’s hashCode()
/equals()
methods.
It has Async, Reactive and RxJava3 interfaces.
RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());
RSet object allows to bind a Lock/ReadWriteLock/Semaphore/CountDownLatch object per value:
RSet<MyObject> set = redisson.getSet("anySet");
MyObject value = new MyObject();
RLock lock = map.getLock(value);
lock.lock();
try {
// process value ...
} finally {
lock.unlock();
}
7.3.1. Set eviction and data partitioning
Redisson provides various Set structure implementations with two important features:
data partitioning - although Set object is cluster compatible its content isn’t scaled/partitioned across multiple Redis master nodes in cluster. Data partitioning allows to scale available memory, read/write operations and entry eviction process for individual Set instance in Redis cluster.
entry eviction - allows to define time to live
per set value. Redis set structure doesn’t support eviction thus it’s done on Redisson side through custom scheduled task which removes expired entries. Eviction task is started once by getSetCache() method execution per unique object name. If object instance isn’t used and has expired elements it should be get through getSetCache() method to start the eviction process. This leads to extra Redis calls and eviction task per unique set object name.
advanced entry eviction - improved version of the entry eviction process. Doesn’t use an entry eviction task.
Below is the list of all available Map implementations:
RedissonClient method name |
Data partitioning |
Entry eviction |
Advanced entry eviction |
Ultra-fast read/write |
---|---|---|---|---|
getSet() open-source version |
❌ | ❌ | ❌ | ❌ |
getSetCache() open-source version |
❌ | ✔️ | ❌ | ❌ |
getSet() Redisson PRO version |
❌ | ❌ | ❌ | ✔️ |
getSetCache() Redisson PRO version |
❌ | ✔️ | ❌ | ✔️ |
getSetCacheV2() available only in Redisson PRO |
✔️ | ❌ | ✔️ | ✔️ |
getClusteredSet() available only in Redisson PRO |
✔️ | ❌ | ❌ | ✔️ |
getClusteredSetCache() available only in Redisson PRO |
✔️ | ✔️ | ❌ | ✔️ |
Eviction
Map object with eviction support implements org.redisson.api.RSetCache
which extends java.util.Set
interface. It has Async, Reactive and RxJava3 interfaces.
Current Redis implementation doesn’t have set value eviction functionality. Therefore expired entries are cleaned by org.redisson.eviction.EvictionScheduler
. It removes 300 expired entries at once. Task launch time tuned automatically and depends on expired entries amount deleted in previous time and varies between 1 second to 1 hour. Thus if clean task deletes 300 entries each time it will be executed every second (minimum execution delay). But if current expired values amount is lower than previous one then execution delay will be increased by 1.5 times.
Code example:
RSetCache<SomeObject> set = redisson.getSetCache("mySet");
// or
RMapCache<SomeObject> set = redisson.getClusteredSetCache("mySet");
// ttl = 10 minutes,
set.add(new SomeObject(), 10, TimeUnit.MINUTES);
// if object is not used anymore
map.destroy();
Data partitioning
Map object with data partitioning support implements org.redisson.api.RClusteredSet
which extends java.util.Set
interface. Read more details about data partitioning here).
Code example:
RClusteredSet<SomeObject> set = redisson.getClusteredSet("mySet");
// or
RClusteredSet<SomeObject> set = redisson.getClusteredSetCache("mySet");
// ttl = 10 minutes,
map.add(new SomeObject(), 10, TimeUnit.MINUTES);
7.3.2. Set listeners
Redisson allows to bind listeners per RSet
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element added/removed/updated after read operation |
org.redisson.api.ExpiredObjectListener | RSet object expired |
org.redisson.api.DeletedObjectListener | RSet object deleted |
org.redisson.api.listener.SetAddListener | Element added |
org.redisson.api.listener.SetRemoveListener | Element removed |
org.redisson.api.listener.SetRemoveRandomListener | Element randomly removed |
Usage example:
RSet<String> set = redisson.getSet("anySet");
int listenerId = set.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
// ...
set.removeListener(listenerId);
7.4. SortedSet
Redis based distributed SortedSet for Java implements java.util.SortedSet interface. This object is fully thread-safe. It uses comparator to sort elements and keep uniqueness. For String data type it’s recommended to use LexSortedSet object due to performance gain.
RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // set object comparator
set.add(3);
set.add(1);
set.add(2);
set.removeAsync(0);
set.addAsync(5);
7.5. ScoredSortedSet
Redis based distributed ScoredSortedSet object. Sorts elements by score defined during element insertion. Keeps elements uniqueness via element state comparison.
It has Async, Reactive and RxJava3 interfaces. Set size is limited by Redis to 4 294 967 295
elements.
RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");
set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));
set.pollFirst();
set.pollLast();
int index = set.rank(new SomeObject(g, d)); // get element index
Double score = set.getScore(new SomeObject(g, d)); // get element score
7.5.1. ScoredSortedSet data partitioning
Although ‘RScoredSortedSet’ object is cluster compatible its content isn’t scaled across multiple Redis master nodes. RScoredSortedSet
data partitioning available only in cluster mode and implemented by separate RClusteredScoredSortedSet
object. Size is limited by whole Redis Cluster memory. More about partitioning here).
Below is the list of all available RScoredSortedSet
implementations:
RedissonClient method name |
Data partitioning support |
Ultra-fast read/write |
---|---|---|
getScoredSortedSet() open-source version |
❌ | ❌ |
getScoredSortedSet() Redisson PRO version |
❌ | ✔️ |
getClusteredScoredSortedSet() available only in Redisson PRO |
✔️ | ✔️ |
Code example:
RClusteredScoredSortedSet set = redisson.getClusteredScoredSortedSet("simpleBitset");
set.add(1.1, "v1");
set.add(1.2, "v2");
set.add(1.3, "v3");
ScoredEntry<String> s = set.firstEntry();
ScoredEntry<String> e = set.pollFirstEntry();
7.5.2. ScoredSortedSet listeners
Redisson allows to bind listeners per RScoredSortedSet
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element created/removed/updated after read operation |
org.redisson.api.listener.ScoredSortedSetAddListener | Element created/updated |
org.redisson.api.listener.ScoredSortedSetRemoveListener | Element removed |
org.redisson.api.ExpiredObjectListener | RScoredSortedSet object expired |
org.redisson.api.DeletedObjectListener | RScoredSortedSet object deleted |
Usage example:
RScoredSortedSet<String> set = redisson.getScoredSortedSet("anySet");
int listenerId = set.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
// ...
set.removeListener(listenerId);
7.6. LexSortedSet
Redis based distributed Set object for Java allows String objects only and implements java.util.Set<String>
interface. It keeps elements in lexicographical order and maintain elements uniqueness via element state comparison.
It has Async, Reactive and RxJava3 interfaces.
RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");
set.rangeTail("d", false);
set.countHead("e");
set.range("d", true, "z", false);
7.6.1. LexSortedSet listeners
Redisson allows to bind listeners per RLexSortedSet
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element created/removed/updated after read operation |
org.redisson.api.listener.ScoredSortedSetAddListener | Element created/updated |
org.redisson.api.listener.ScoredSortedSetRemoveListener | Element removed |
org.redisson.api.ExpiredObjectListener | RScoredSortedSet object expired |
org.redisson.api.DeletedObjectListener | RScoredSortedSet object deleted |
Usage example:
RLexSortedSet<String> set = redisson.getLexSortedSet("anySet");
int listenerId = set.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
// ...
set.removeListener(listenerId);
7.7. List
Redis based distributed List object for Java implements java.util.List
interface. It keeps elements in insertion order.
It has Async, Reactive and RxJava3 interfaces. List size is limited by Redis to 4 294 967 295
elements.
RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());
7.7.1. List listeners
Redisson allows to bind listeners per RList
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element created/removed/updated after read operation |
org.redisson.api.listener.ListAddListener | Element created |
org.redisson.api.listener.ListInsertListener | Element inserted |
org.redisson.api.listener.ListSetListener | Element set/updated |
org.redisson.api.listener.ListRemoveListener | Element removed |
org.redisson.api.listener.ListTrimListener | List trimmed |
org.redisson.api.ExpiredObjectListener | RList object expired |
org.redisson.api.DeletedObjectListener | RList object deleted |
Usage example:
RList<String> list = redisson.getList("anyList");
int listenerId = list.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
// ...
list.removeListener(listenerId);
7.8. Queue
Redis based distributed unbounded Queue object for Java implements java.util.Queue interface. This object is fully thread-safe.
It has Async, Reactive and RxJava3 interfaces.
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
7.8.1. Queue listeners
Redisson allows to bind listeners per RQueue
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element created/removed/updated after read operation |
org.redisson.api.listener.ListAddListener | Element created |
org.redisson.api.listener.ListRemoveListener | Element removed |
org.redisson.api.ExpiredObjectListener | RQueue object expired |
org.redisson.api.DeletedObjectListener | RQueue object deleted |
Usage example:
RQueue<String> queue = redisson.getQueue("anyList");
int listenerId = queue.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
// ...
queue.removeListener(listenerId);
7.9. Deque
Redis based distributed unbounded Deque object for Java implements java.util.Deque
interface. This object is fully thread-safe.
It has Async, Reactive and RxJava3 interfaces.
RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();
7.9.1. Deque listeners
Redisson allows to bind listeners per RDeque
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element created/removed/updated after read operation |
org.redisson.api.listener.ListAddListener | Element created |
org.redisson.api.listener.ListRemoveListener | Element removed |
org.redisson.api.ExpiredObjectListener | RDeque object expired |
org.redisson.api.DeletedObjectListener | RDeque object deleted |
Usage example:
RDeque<String> deque = redisson.getDeque("anyList");
int listenerId = deque.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
// ...
deque.removeListener(listenerId);
7.10. Blocking Queue
Redis based distributed unbounded BlockingQueue object for Java implements java.util.concurrent.BlockingQueue
interface. This object is fully thread-safe.
It has Async, Reactive and RxJava3 interfaces.
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject obj = queue.poll();
SomeObject obj = queue.poll(10, TimeUnit.MINUTES);
poll
, pollFromAny
, pollLastAndOfferFirstTo
and take
methods are resubscribed automatically during re-connection to Redis server or failover.
7.11. Bounded Blocking Queue
Redis based distributed BoundedBlockingQueue for Java implements java.util.concurrent.BlockingQueue
interface. BoundedBlockingQueue size limited by Redis to 4 294 967 295
elements. This object is fully thread-safe. This object is fully thread-safe.
Queue capacity should be defined once by trySetCapacity()
method before the usage:
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// returns `true` if capacity set successfully and `false` if it already set.
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
poll
, pollFromAny
, pollLastAndOfferFirstTo
and take
methods will be resubscribed automatically during reconnection to Redis server or Redis server failover.
7.12. Blocking Deque
Java implementation of Redis based BlockingDeque implements java.util.concurrent.BlockingDeque
interface. This object is fully thread-safe. This object is fully thread-safe.
It has Async, Reactive and RxJava3 interfaces.
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
poll
, pollFromAny
, pollLastAndOfferFirstTo
and take
methods are resubscribed automatically during re-connection to Redis server or failover.
7.13. Blocking Fair Queue
Redis based distributed BlockingFairQueue
for Java implements java.util.concurrent.BlockingQueue
interface. This object is fully thread-safe.
When queue consumers in different parts of network: some of them closer to redis and some further. “further” consumers will get lower amount of messages from queue due to network delays. In turn “closer” consumers will get higher amount and this could lead to client overloading.
Blocking queue with fair polling guarantees access order for poll
and take
methods and allows to get uniformly distributed consumption.
RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());
SomeObject element = queue.peek();
SomeObject element = queue.poll();
SomeObject element = queue.poll(10, TimeUnit.MINUTES);
SomeObject element = queue.take();
This feature available only in Redisson PRO edition.
7.14. Blocking Fair Deque
Redis based distributed BlockingFairDeque
for Java implements java.util.concurrent.BlockingDeque
interface. This object is fully thread-safe.
When deque consumers in different parts of network: some of them closer to redis and some further. “further” consumers will get lower amount of messages from queue due to network delays. In turn “closer” consumers will get higher amount and this could lead to client overloading.
Blocking deque with fair polling guarantees access order for poll
, take
, pollFirst
, takeFirst
, pollLast
and takeLast
methods and allows to get uniformly distributed consumption.
RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());
SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();
SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();
This feature available only in Redisson PRO edition.
7.15. Delayed Queue
Redis based DelayedQueue object for Java allows to transfer each element to destination queue with specified delay. Destination queue could be any queue implemented RQueue
interface. This object is fully thread-safe.
Could be useful for exponential backoff strategy used for message delivery to consumer. If application is restarted, an instance of delayed queue should created in order for the pending items to be added to the destination queue.
RBlockingQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// move object to distinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to distinationQueue in 1 minutes
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
// msg1 will appear in 10 seconds
distinationQueue.poll(15, TimeUnit.SECONDS);
// msg2 will appear in 2 seconds
distinationQueue.poll(2, TimeUnit.SECONDS);
Object should be destroyed if it not used anymore, but it’s not necessary to call destroy method if Redisson goes shutdown.
RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();
7.16. Priority Queue
Java implementation of Redis based PriorityQueue implements java.util.Queue interface. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is fully thread-safe.
Use trySetComparator()
method to define own java.util.Comparator.
Code example:
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityQueue<Entry> queue = redisson.getPriorityQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.poll();
// Entry [b:1]
Entry e = queue.poll();
// Entry [c:1]
Entry e = queue.poll();
7.17. Priority Deque
Java implementation of Redis based PriorityDeque implements java.util.Deque interface. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is fully thread-safe.
Use trySetComparator()
method to define own java.util.Comparator.
Code example:
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityDeque<Entry> queue = redisson.getPriorityDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.pollFirst();
// Entry [c:1]
Entry e = queue.pollLast();
7.18. Priority Blocking Queue
Java implementation of Redis based PriorityBlockingQueue similar to JDK java.util.concurrent.PriorityBlockingQueue object. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is fully thread-safe.
Use trySetComparator()
method to define own java.util.Comparator.
poll
, pollLastAndOfferFirstTo
and take
methods are resubscribed automatically during re-connection to Redis server or failover.
Code example:
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityBlockingQueue<Entry> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.take();
7.19. Priority Blocking Deque
Java implementation of Redis based PriorityBlockingDeque implements java.util.concurrent.BlockingDeque interface. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is fully thread-safe.
Use trySetComparator()
method to define own java.util.Comparator.
poll
, pollLastAndOfferFirstTo
, take
methods are resubscribed automatically during re-connection to Redis server or failover.
Code example:
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityBlockingDeque<Entry> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.takeFirst();
// Entry [c:1]
Entry e = queue.takeLast();
7.20. Stream
Java implementation of Redis based Stream object wraps Redis Stream feature. Basically it allows to create Consumers Group which consume data added by Producers. This object is fully thread-safe.
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId sm = stream.add(StreamAddArgs.entry("0", "0"));
stream.createGroup("testGroup");
StreamId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamId, Map<String, String>> group = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
// return entries in pending state after read group method execution
Map<StreamMessageId, Map<String, String>> pendingData = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);
// transfer ownership of pending messages to a new consumer
List<StreamMessageId> transferedIds = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);
// mark pending entries as correctly processed
long amount = stream.ack("testGroup", id1, id2);
Code example of Async interface usage:
RStream<String, String> stream = redisson.getStream("test");
RFuture<StreamMessageId> smFuture = stream.addAsync(StreamAddArgs.entry("0", "0"));
RFuture<Void> groupFuture = stream.createGroupAsync("testGroup");
RFuture<StreamId> id1Future = stream.addAsync(StreamAddArgs.entry("1", "1"));
RFuture<StreamId> id2Future = stream.addAsync(StreamAddArgs.entry("2", "2"));
RFuture<Map<StreamId, Map<String, String>>> groupResultFuture = stream.readGroupAsync("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
// return entries in pending state after read group method execution
RFuture<Map<StreamMessageId, Map<String, String>>> pendingDataFuture = stream.pendingRangeAsync("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);
// transfer ownership of pending messages to a new consumer
RFuture<List<StreamMessageId>> transferedIdsFuture = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);
// mark pending entries as correctly processed
RFuture<Long> amountFuture = stream.ackAsync("testGroup", id1, id2);
amountFuture.whenComplete((res, exception) -> {
// ...
});
Code example of Reactive interface usage:
RedissonReactiveClient redisson = redissonClient.reactive();
RStreamReactive<String, String> stream = redisson.getStream("test");
Mono<StreamMessageId> smMono = stream.add(StreamAddArgs.entry("0", "0"));
Mono<Void> groupMono = stream.createGroup("testGroup");
Mono<StreamId> id1Mono = stream.add(StreamAddArgs.entry("1", "1"));
Mono<StreamId> id2Mono = stream.add(StreamAddArgs.entry("2", "2"));
Mono<Map<StreamId, Map<String, String>>> groupMono = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
// return entries in pending state after read group method execution
Mono<Map<StreamMessageId, Map<String, String>>> pendingDataMono = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);
// transfer ownership of pending messages to a new consumer
Mono<List<StreamMessageId>> transferedIdsMono = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);
// mark pending entries as correctly processed
Mono<Long> amountMono = stream.ack("testGroup", id1, id2);
amountMono.doOnNext(res -> {
// ...
}).subscribe();
Code example of RxJava3 interface usage:
RedissonRxClient redisson = redissonClient.rxJava();
RStreamRx<String, String> stream = redisson.getStream("test");
Single<StreamMessageId> smRx = stream.add(StreamAddArgs.entry("0", "0"));
Completable groupRx = stream.createGroup("testGroup");
Single<StreamId> id1Rx = stream.add(StreamAddArgs.entry("1", "1"));
Single<StreamId> id2Rx = stream.add(StreamAddArgs.entry("2", "2"));
Single<Map<StreamId, Map<String, String>>> groupRx = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
// return entries in pending state after read group method execution
Single<Map<StreamMessageId, Map<String, String>>> pendingDataRx = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);
// transfer ownership of pending messages to a new consumer
Single<List<StreamMessageId>> transferedIdsRx = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);
// mark pending entries as correctly processed
Single<Long> amountRx = stream.ack("testGroup", id1, id2);
amountRx.doOnSuccess(res -> {
// ...
}).subscribe();
7.20.1. Stream listeners
Redisson allows to bind listeners per RStream
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element added/removed/updated after read operation |
org.redisson.api.ExpiredObjectListener | RStream object expired |
org.redisson.api.DeletedObjectListener | RStream object deleted |
org.redisson.api.listener.StreamAddListener | Element added |
org.redisson.api.listener.StreamRemoveListener | Element removed |
org.redisson.api.listener.StreamCreateGroupListener | Group created |
org.redisson.api.listener.StreamRemoveGroupListener | Group removed |
org.redisson.api.listener.StreamCreateConsumerListener | Consumer created |
org.redisson.api.listener.StreamRemoveConsumerListener | Consumer removed |
org.redisson.api.listener.StreamTrimListener | Stream trimmed |
Usage example:
RStream<String, String> stream = redisson.getStream("anySet");
int listenerId = stream.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
int listenerId = stream.addListener(new StreamAddListener() {
@Override
public void onAdd(String name) {
// ...
}
});
// ...
stream.removeListener(listenerId);
7.21. Ring Buffer
Java implementation of Redis based RingBuffer implements java.util.Queue interface. This structure evicts elements from the head if queue capacity became full. This object is fully thread-safe.
Should be initialized with capacity size by trySetCapacity()
method before usage.
Code example:
RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");
// buffer capacity is 4 elements
buffer.trySetCapacity(4);
buffer.add(1);
buffer.add(2);
buffer.add(3);
buffer.add(4);
// buffer state is 1, 2, 3, 4
buffer.add(5);
buffer.add(6);
// buffer state is 3, 4, 5, 6
Code example of Async interface usage:
RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");
// buffer capacity is 4 elements
RFuture<Boolean> capacityFuture = buffer.trySetCapacityAsync(4);
RFuture<Boolean> addFuture = buffer.addAsync(1);
RFuture<Boolean> addFuture = buffer.addAsync(2);
RFuture<Boolean> addFuture = buffer.addAsync(3);
RFuture<Boolean> addFuture = buffer.addAsync(4);
// buffer state is 1, 2, 3, 4
RFuture<Boolean> addFuture = buffer.addAsync(5);
RFuture<Boolean> addFuture = buffer.addAsync(6);
// buffer state is 3, 4, 5, 6
addFuture.whenComplete((res, exception) -> {
// ...
});
Code example of Reactive interface usage:
RedissonReactiveClient redisson = redissonClient.reactive();
RRingBufferReactive<Integer> buffer = redisson.getRingBuffer("test");
// buffer capacity is 4 elements
Mono<Boolean> capacityMono = buffer.trySetCapacity(4);
Mono<Boolean> addMono = buffer.add(1);
Mono<Boolean> addMono = buffer.add(2);
Mono<Boolean> addMono = buffer.add(3);
Mono<Boolean> addMono = buffer.add(4);
// buffer state is 1, 2, 3, 4
Mono<Boolean> addMono = buffer.add(5);
Mono<Boolean> addMono = buffer.add(6);
// buffer state is 3, 4, 5, 6
addMono.doOnNext(res -> {
// ...
}).subscribe();
Code example of RxJava3 interface usage:
RedissonRxClient redisson = redissonClient.rxJava();
RRingBufferRx<Integer> buffer = redisson.getRingBuffer("test");
// buffer capacity is 4 elements
Single<Boolean> capacityRx = buffer.trySetCapacity(4);
Single<Boolean> addRx = buffer.add(1);
Single<Boolean> addRx = buffer.add(2);
Single<Boolean> addRx = buffer.add(3);
Single<Boolean> addRx = buffer.add(4);
// buffer state is 1, 2, 3, 4
Single<Boolean> addRx = buffer.add(5);
Single<Boolean> addRx = buffer.add(6);
// buffer state is 3, 4, 5, 6
addRx.doOnSuccess(res -> {
// ...
}).subscribe();
7.21.1. Ring Buffer listeners
Redisson allows to bind listeners per RRingBuffer
object.
Listener class name | Event description |
---|---|
org.redisson.api.listener.TrackingListener | Element created/removed/updated after read operation |
org.redisson.api.listener.ListAddListener | Element created |
org.redisson.api.listener.ListRemoveListener | Element removed |
org.redisson.api.ExpiredObjectListener | RRingBuffer object expired |
org.redisson.api.DeletedObjectListener | RRingBuffer object deleted |
Usage example:
RRingBuffer<String> queue = redisson.getRingBuffer("anyList");
int listenerId = queue.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
// ...
}
});
// ...
queue.removeListener(listenerId);
7.22. Transfer Queue
Java implementation of Redis based TransferQueue implements java.util.concurrent.TransferQueue interface. Provides set of transfer
methods which return only when value was successfully hand off to consumer. This object is fully thread-safe.
poll
and take
methods are resubscribed automatically during re-connection to Redis server or failover.
Code example:
RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");
queue.transfer("data");
// or try transfer immediately
queue.tryTransfer("data");
// or try transfer up to 10 seconds
queue.tryTransfer("data", 10, TimeUnit.SECONDS);
// in other thread or JVM
queue.take();
// or
queue.poll();
Code example of Async interface usage:
RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");
RFuture<Void> future = queue.transferAsync("data");
// or try transfer immediately
RFuture<Boolean> future = queue.tryTransferAsync("data");
// or try transfer up to 10 seconds
RFuture<Boolean> future = queue.tryTransferAsync("data", 10, TimeUnit.SECONDS);
// in other thread or JVM
RFuture<String> future = queue.takeAsync();
// or
RFuture<String> future = queue.pollAsync();
future.whenComplete((res, exception) -> {
// ...
});
Code example of Reactive interface usage:
RedissonReactiveClient redisson = redissonClient.reactive();
RTransferQueueReactive<String> queue = redisson.getTransferQueue("myCountDownLatch");
Mono<Void> mono = queue.transfer("data");
// or try transfer immediately
Mono<Boolean> mono = queue.tryTransfer("data");
// or try transfer up to 10 seconds
Mono<Boolean> mono = queue.tryTransfer("data", 10, TimeUnit.SECONDS);
// in other thread or JVM
Mono<String> mono = queue.take();
// or
Mono<String> mono = queue.poll();
mono.doOnNext(res -> {
// ...
}).subscribe();
Code example of RxJava3 interface usage:
RedissonRxClient redisson = redissonClient.rxJava();
RTransferQueueRx<String> queue = redisson.getTransferQueue("myCountDownLatch");
Completable res = queue.transfer("data");
// or try transfer immediately
Single<Boolean> resRx = queue.tryTransfer("data");
// or try transfer up to 10 seconds
Single<Boolean> resRx = queue.tryTransfer("data", 10, TimeUnit.SECONDS);
// in other thread or JVM
Single<String> resRx = queue.take();
// or
Maybe<String> resRx = queue.poll();
resRx.doOnSuccess(res -> {
// ...
}).subscribe();
7.23. Time Series
Java implementation of Redis based TimeSeries object allows to store value by timestamp and define TTL(time-to-live) per entry. Values are ordered by timestamp. This object is fully thread-safe.
Code example:
RTimeSeries<String> ts = redisson.getTimeSeries("myTimeSeries");
ts.add(201908110501, "10%");
ts.add(201908110502, "30%");
ts.add(201908110504, "10%");
ts.add(201908110508, "75%");
// entry time-to-live is 10 hours
ts.add(201908110510, "85%", 10, TimeUnit.HOURS);
ts.add(201908110510, "95%", 10, TimeUnit.HOURS);
String value = ts.get(201908110508);
ts.remove(201908110508);
Collection<String> values = ts.pollFirst(2);
Collection<String> range = ts.range(201908110501, 201908110508);
Code example of Async interface usage:
RTimeSeries<String> ts = redisson.getTimeSeries("myTimeSeries");
RFuture<Void> future = ts.addAsync(201908110501, "10%");
RFuture<Void> future = ts.addAsync(201908110502, "30%");
RFuture<Void> future = ts.addAsync(201908110504, "10%");
RFuture<Void> future = ts.addAsync(201908110508, "75%");
// entry time-to-live is 10 hours
RFuture<Void> future = ts.addAsync(201908110510, "85%", 10, TimeUnit.HOURS);
RFuture<Void> future = ts.addAsync(201908110510, "95%", 10, TimeUnit.HOURS);
RFuture<String> future = ts.getAsync(201908110508);
RFuture<Boolean> future = ts.removeAsync(201908110508);
RFuture<Collection<String>> future = t.pollFirstAsync(2);
RFuture<Collection<String>> future = t.rangeAsync(201908110501, 201908110508);
future.whenComplete((res, exception) -> {
// ...
});
Code example of Reactive interface usage:
RedissonReactiveClient redisson = redissonClient.reactive();
RTimeSeriesReactive<String> ts = redisson.getTimeSeries("myTimeSeries");
Mono<Void> mono = ts.add(201908110501, "10%");
Mono<Void> mono = ts.add(201908110502, "30%");
Mono<Void> mono = ts.add(201908110504, "10%");
Mono<Void> mono = ts.add(201908110508, "75%");
// entry time-to-live is 10 hours
Mono<Void> mono = ts.add(201908110510, "85%", 10, TimeUnit.HOURS);
Mono<Void> mono = ts.add(201908110510, "95%", 10, TimeUnit.HOURS);
Mono<String> mono = ts.get(201908110508);
Mono<Boolean> mono = ts.remove(201908110508);
Mono<Collection<String>> mono = ts.pollFirst(2);
Mono<Collection<String>> mono = ts.range(201908110501, 201908110508);
mono.doOnNext(res -> {
// ...
}).subscribe();
Code example of RxJava3 interface usage:
RedissonRxClient redisson = redissonClient.rxJava();
RTimeSeriesRx<String> ts = redisson.getTimeSeries("myTimeSeries");
Completable rx = ts.add(201908110501, "10%");
Completable rx = ts.add(201908110502, "30%");
Completable rx = ts.add(201908110504, "10%");
Completable rx = ts.add(201908110508, "75%");
// entry time-to-live is 10 hours
Completable rx = ts.add(201908110510, "85%", 10, TimeUnit.HOURS);
Completable rx = ts.add(201908110510, "95%", 10, TimeUnit.HOURS);
Maybe<String> rx = ts.get(201908110508);
Single<Boolean> rx = ts.remove(201908110508);
Single<Collection<String>> rx = ts.pollFirst(2);
Single<Collection<String>> rx = ts.range(201908110501, 201908110508);
rx.doOnSuccess(res -> {
// ...
}).subscribe();