Posted on Leave a comment

InfinityDB in Wireless Sensor Data Collection

InfinityDB is ideal for Java-based sensor data acquisition systems, such as wireless sensor networks. The IoT and sensor data acquisition industry will be expanding at 20% per year for the forseeable future and  the number of sensors and the sensor data rate is growing exponentially as well. The new RFID battery-less sensor technologies make it possible to have many sensors per computer, further increasing data velocity.

InfinityDB is a fast and efficient Java embedded database management system that can address these challenges. Advantages include:

  • Low memory footprint with no dangerous temporary high usage peaks;
  • Extreme data compression on Flash – an order of magnitude possible;
  • Low power due to efficient individual operations, such as get()/put()/remove(), higher(), lower(), ceiling(), floor();
  • Multi-threaded for concurrent collection and forwarding from multiple sensors;
  • Extreme speed – uses multiple cores, optimized data structures;
  • High speed global data analysis via fast Iterators, forEach(),  Java 8 streams, and parallel scan;
  • Data is ordered by any key – ideal for interleaving and indexing time-series data;
  • Flexible, simple design and usage – it is accessed as a standard java.util.concurrent.ConcurrentNavigableMap;
  • Big DBMS-like features: transactionality, instant recovery, strong typing including CLOBs and BLOBs;
  • Maintenance-free, dynamic in-the-field data structure extension, no upgrades or versioning necessary;
  • Patent-pending.

No other DBMS or file-system-based storage is needed, as the flexibility allows all necessary data to be structured inside InfinityDB in a single file. The data structures are easily programmer-defined and can grow dynamically, allowing in-the-field dynamic enhancement of the database structure without versioning.

InfinityDB has been in use by Rockwell in monitoring oil and gas wellhead performance in the middle east for years.

Please contact rlderan2@boilerbay.com to get more information.

 

 

Posted on

DZone Faster Streams with AirConcurrentMap Article

One of the main advantages of the Java 8 streams feature is its ability to take advantage of parallelism for many common operations, keeping multiple cores busy simultaneously and increasing performance. Streaming out the contents from a Map in parallel can be very fast. However, performance depends dramatically on Map size, and we will show this with actual performance test code and results graphs.

We will talk here about:

  • The performance improvement you can actually expect for some common Map stream operations using parallelism, in particular a simple value summer;
  • The Java Microbenchmarking Harness, which can provide precise, fair, repeatable performance tests, which we present with example code.  Extensive JMH tests for Maps are on github at boilerbay/airconcurrentmap;
  •  AirConcurrentMap , a  java.util.concurrent.ConcurrentNavigableMap  with its own parallelism technique that is faster than any JDK Map.  For medium to large Maps, AirConcurrentMap is also faster than any JDK Map for Iteration and   forEach(), faster than ConcurrentSkipListMap for   get(), put(), remove(), higher(), lower(), ceiling(), and floor, , and the most memory efficient.

One would expect the parallel streams mode always to be faster, but sometimes serial mode wins, and sometimes non-streams techniques like forEach() win.  There are different setup times for different techniques, and for large Maps, performance generally decreases, probably due to caching effects.  The JDK Maps are also very ‘peaky’.

Actual testing is required in order to guage the effects, which we will do here, but only for streams. We will test a streams-based value summer as well as a new parallelism technique for AirConcurrentMap, which was written by the author.

Basic Stream-Based Summing

Here is a simple streams-based summer for the values of a Map. There is a convenient stream  sum() method:

Map<Object, Long> map = new HashMap<Object, Long>();
// fill in...
long result = map.values().stream().sum();
Now to make it parallel, all that is necessary is to chain-in a  .parallel()  invocation:
long result = map.values().stream().parallel().sum();

This  sum()  is actually just a handy way to do a  reduce()  on the stream, so it is the equivalent of:

long result = map.values().stream().parallel().reduce(0L, (x, y) -> x + y);

To get best possible performance, we want to convert the stream to a native LongStream :

long result = map.values().stream().parallel() 
    .mapToLong(v -> ((Long)v).longValue())
    .sum();

The forEach() Approach

Let’s step back for a moment to compare the streams-based approach to the alternatives as background. These techniques can be faster for small Maps.

This will help introduce the optional AirConcurrentMap approach we look at later.

The  Map.forEach(BiConsumer)  mechanism generally beats Iterators by about 0% to 20%. Iterators are slower probably for several reasons, although JVM runtime code optimization is so effective that these are only general hints:

  • The state of an Iterator is kept in its instance variables, while the loop inside forEach()  uses local variables;
  • Iterators require two method invocations per loop, although hasNext()  may be very simple;
  • Iterator methods must check for  NoSuchElementException  in case  next()  is invoked without conditioning it on hasNext() . A Map may be iterated without using hasNext()  at all!
  • An Iterator over the  entrySet()  must sometimes construct and return a  Map.Entry ; and
  • It may be necessary to check for  ConcurrentModificationException  using a modification counter or other means for non-concurrent Maps.

A forEach() Example

For speed with small Maps, a summer using  forEach(BiConsumer)  can be implemented as follows. The  BiConsumer  lambda requires that local variables such as the running sum in the enclosing scope be effectively final, and we have to resort to some kind of ‘Long holder’. This way, the ‘Long holder’ reference is final, but its contents are not, and we get the effect of a closure. Alternatively, a primitive long non-final instance variable in an enclosing class scope can be used directly. An  AtomicLong  is a convenient ‘mutable Long’. It does not actually need to be atomic because  Map.forEach()  is single-threaded.

final AtomicLong sum = new AtomicLong();
map.forEach((x, y) -> sum.set(sum.get() + y));
return sum.get();

The above is not very tidy compared to streaming, and it becomes unwieldy as the computation becomes more elaborate, but it has very little startup cost and it does not necessarily require more lines of code. Also, the  AtomicLong.set()  and  get()  and the implied boxing and  longValue()  invocations may be inefficient, putting an additional load on the Just-In-Time compiler. (Note that  AtomicLong.addAndGet()  does an internal ‘lock-free’ atomic loop buried inside native code, which is slower.) We can help out the JIT by putting the sum in a primitive instance variable: this can actually help sometimes, although whether to try to do this kind of optimization is very controversial! We really do get about 0% to 10% improvement, but it is unpredictable. (You have to look at the native compiled code to convince yourself this is true, but it is very hard to figure out.) So, we might do this (an anonymous inner class won’t work):

class SummingConsumer implements BiConsumer<Object, Long> {
    long sum = 0;

    public void accept(Object k, Long v) {
    	sum += v;
    }
}
SummingConsumer summingConsumer = new SummingConsumer();
map.forEach(summingConsumer);
return summingConsumer.sum;

An alternative AirConcurrentMap technique is almost identical but faster, partially because instead of implementing the  BiConsumer  interface, one extends a new  MapVisitor  class, avoiding the extra overhead of interface method invocation. Such interface overhead is often – but not always – optimized away by the JIT, depending, for example, on the dynamic set of currently loaded, compiled and actively executable classes and interfaces. There is no polymorphism here to complicate the optimization. A 0% to 20% improvement is actually seen, but is unpredictable.

// com.infinitydb.map.MapVisitor is specific to AirConcurrentMap
class SummingMapVisitor extends MapVisitor<Object, Long> {
    long sum = 0;

    public void visit(Object k, Long v) {
    	sum += v;
    }
}
// 0 to 20% faster than the BiConsumer interface
SummingMapVisitor summingMapVisitor = new SummingMapVisitor();
airConcurrentMap.visit(summingMapVisitor);
return summingMapVisitor.sum;

The Fork/Join Pool and Spliterators

Now let’s drop into a brief discussion of the streams parallelism implementation – this is just background.

The streams parallelism is implemented internally using a  ForkJoinPool , which is a kind of  AbstractExecutorService . The pool is a set of Threads managed by the JVM that are scheduled dynamically, each with an input that is a queue of ForkJoinTask  elements. The management of the queues is complex, involving ‘work stealing’ and other optimizations that would be inordinately delicate for clients to handle themselves.

The queue elements for streams are subclasses of Spliterator implemented by the Map, which move through the Map splitting themselves recursively at times. Each  Spliterator  instance executes its own code serially, but different instances execute in parallel. If a Map implementation does not provide a  Spliterator , one is provided by default that uses arrays, normally with lower performance and more temporary memory usage.

It can be difficult for a  Spliterator  to decide when to split in such a way that all of the Threads finish at once, hence there can be a ‘tail’ at the end of the stream operation while fewer and fewer cores are used. This is reduced using ‘work stealing’. A  Spliterator  may also have trouble in judging when to split, especially if the tree is imbalanced, and it is necessary for it not to split too often, creating too much garbage and needlessly long queues, or to split too rarely, limiting parallelism.

A Map need not be capable of concurrency itself in order to be scanned in parallel, so it need not implement   java.util.concurrent.ConcurrentMap  except as a protection against undefined behavior resulting from modification during the scan. A non-concurrent Map will sometimes throw   ConcurrentModificationException  on concurrent modification, but this cannot be relied upon.

The Alternative AirConcurrentMap Parallelism Model

AirConcurrentMap , a   java.util.concurrent.ConcurrentNavigableMap , can optionally use its own internal Map-optimized Thread pool to provide high performance. All of the regular techniques are available as well.

In contrast to the JDK Map-embedded Spliterator  approach, the client provides its own  ThreadedMapVisitor  subclass of  MapVisitor , which resembles a simplified ForkJoinTask . The client’s  ThreadedMapVisitor  subclass explicitly defines the actions to take on forking and joining, while the Map chooses when actually to instantiate them. The subclass can contain internal state, propagating it to children and aggregating it from them. For example, collections of results can be aggregated recursively.

The parallel summer looks like this:

class ParallelSummer extends ThreadedMapVisitor<Object, Long> {
    long sum = 0;

    long getSum(VisitableMap<Object, Long> map) {
        map.visit(this);
        return sum;
    }

    // Implement MapVisitor, the superclass of ThreadedMapVisitor
    @Override
    public void visit(Object k, Long v) {
        sum += v.longValue();
    }

    // Implement ThreadedMapVisitor For parallelism
    @Override
    public ParallelSummer split() {
        return new ParallelSummer();
    }

    // Implement ThreadedMapVisitor For parallelism
    @Override
    public void merge(ThreadedMapVisitor tmv) {
        sum += ((ParallelSummer)tmv).sum;
    }
}

Then, at the point of use, the ParallelSummer  is constructed and applied to the Map:

long result = new ParallelSummer().getSum(map);

Thus the code is more complex in that it requires the re-useable summer implementation, but then it requires only one line of client code. Further optimizations are encapsulated and propagated to all client uses.

The Peakyness Problem

This graph represents the measured parallel streams performance of  java.util.concurrent.ConcurrentSkipListMap  for value summing over a logarithmic size scale, as well as the AirConcurrentMap speed. This is on a 2.4GHz quad-core X86-64.  ConcurrentSkipListMap  was the fastest JDK Map measured.

As can be seen, the light blue shows a serial streams peak at about 200 Entries, while the purple shows parallel streams peak at about 80K Entries. Parallel streams reaches almost zero below 100 Entries, while serial streams nears zero above about 100K Entries, hence it is vital to know an approximate Map size in advance. The green is serial AirConcurrentMap and red is parallel AirConcurrentMap: there are peaks, but no zeros, and the performance is nowhere worse than  ConcurrentSkipListMap . The data for these graphs and more can be generated by the code in github boilerbay/airconcurrentmap in test/src/com/infinitydb/map/test. (The graph was created from the data by some R scripts to be provided in the future).

Image title

(image Copyright (C) 2017, Roger L. Deran)

The Java Microbenchmarking Harness

To do performance testing of small pieces of code that are to execute quickly, the Java Microbenchmarking Harness  is excellent. It uses Annotations to designate portions of the code for various purposes like setup and target methods, automatically doing the looping, process forks, warmup iterations, timing, and statistics analysis. If you are familiar with junit 4 or later, this Annotation idea will be familiar. Tests can be parameterized in order to sweep over various selected use cases. JMH uses Apache Maven, so it is very easy to set up.

The JMH Tests

We use JMH here to test performance for JDK parallel streams and AirConcurrentMap parallel visitors. The source for the code here is in github. There is also JMH test code there for:

  •  get(), put(), remove(), higher(), lower(), ceiling(), and floor() ,
  • Iterators,
  •  forEach() and MapVisitors , and
  •  memory efficiency.

The results are below the code.

We have implemented a  SummingVisitor  class with a  getSum(Map)  method that can fall back to the streams approach, so that it can can be used on an AirConcurrentMap or any other Map, with optimum performance in any case. All we have to do is instantiate and use a summer in the benchmark method  testSummingStreams() . Because the  SummingVisitor  extends ThreadedMapVisitor  instead of just  MapVisitor , it will always be used in a parallel mode if the Map supports it (there are other  VisitableMaps  in the works and ‘wrappers’ for Lists, Sets, arrays, and  java.util.Maps  to convert them to VisitableMaps , and these are sometimes capable of parallelism).

This test is parameterized, so it runs over a selected set of Map classes and a selected set of exponentially increasing Map sizes to show a good spectrum of use cases. To run it, see the instructions in the actual github code.

@State(Scope.Benchmark)
public class StreamsJMHAirConcurrentMapTest {
    @Param({
      "com.infinitydb.map.air.AirConcurrentMap",
      "java.util.concurrent.ConcurrentSkipListMap",
      "java.util.concurrent.ConcurrentHashMap"
    })
    static String mapClassName;
    @Param({ "0", "1", "10", "100", "1000", "10000", "100000", "1000000", "10000000" })
    static long mapSize;
    static Map<Object, Long> map;
    @Setup(Level.Trial)
    static public void setup() throws InstantiationException,
      IllegalAccessException, ClassNotFoundException {
        Class<Map<Object, Long>> mapClass =
        (Class<Map<Object, Long>>)Class.forName(mapClassName);
        map = mapClass.newInstance();
        Random random = new Random(System.nanoTime());
        System.gc();
        // Load up the Map
        for (long i = 0; i < mapSize; i++) {
            long v = random.nextLong();
            map.put(v, v);
        }
        System.gc();
    }

    @Benchmark
    public static long testSummingStream() {
        // Client code.
        return new SummingVisitor().getSum(map);
    }

    static class SummingVisitor extends ThreadedMapVisitor<Object, Long> {
        long sum = 0;

        long getSum(Map<Object, Long> map) {
            if (map instanceof VisitableMap) {
                // Use the fast AirConcurrentMap parallel scan
                ((VisitableMap)map).visit(this);
                return sum;
            } else {
                // Drop back to slower streams.
                // The code for sum() is just a reduce, giving the same
                // performance
                return map.values().stream().parallel()
                  .mapToLong(v -> ((Long)v).longValue())
                  .reduce(0L, (x, y) -> x + y);
            }
        }

        /*
        * implement MapVisitor for speed. Invoked when used with a VisitableMap
        * such as AirConcurrentMap. Similar to BiConsumer.
        */
        @Override
        public void visit(Object k, Long v) {
            sum += v.longValue();
        }

        // Implement ThreadedMapVisitor For parallelism
        @Override
        public SummingVisitor split() {
            return new SummingVisitor();
        }

        // Implement ThreadedMapVisitor For parallelism
        @Override
        public void merge(ThreadedMapVisitor tmv) {
            sum += ((SummingVisitor)tmv).sum;
        }
    }
}

Here are the results. The HashMap and TreeMap results are similar – why not try them for yourself?

# Run complete. Total time: 01:58:59
Benchmark (mapClassName) (mapSize) Mode Cnt Score Error Units
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 0 thrpt 200 47669627.157 â–’ 408068.881 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 1 thrpt 200 36128245.803 â–’ 219021.093 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 10 thrpt 200 28819134.716 â–’ 215535.681 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 100 thrpt 200 5983782.906 â–’ 12171.457 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 1000 thrpt 200 503450.631 â–’ 2160.534 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 10000 thrpt 200 51363.052 â–’ 192.871 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 100000 thrpt 200 8785.362 â–’ 180.963 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 1000000 thrpt 200 280.375 â–’ 1.321 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream com.infinitydb.map.air.AirConcurrentMap 10000000 thrpt 200 18.017 â–’ 0.070 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 0 thrpt 200 11715613.910 â–’ 30641.713 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 1 thrpt 200 10587246.514 â–’ 25303.461 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 10 thrpt 200 476882.573 â–’ 69406.838 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 100 thrpt 200 92033.529 â–’ 4059.402 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 1000 thrpt 200 49317.703 â–’ 620.204 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 10000 thrpt 200 17426.535 â–’ 503.417 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 100000 thrpt 200 2666.609 â–’ 96.302 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 1000000 thrpt 200 166.188 â–’ 2.526 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentSkipListMap 10000000 thrpt 200 4.473 â–’ 0.136 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 0 thrpt 200 11748597.966 â–’ 55376.295 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 1 thrpt 200 7591252.605 â–’ 58655.123 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 10 thrpt 200 192421.408 â–’ 1781.336 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 100 thrpt 200 93215.872 â–’ 1053.339 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 1000 thrpt 200 65290.405 â–’ 592.948 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 10000 thrpt 200 15459.798 â–’ 37.701 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 100000 thrpt 200 1527.065 â–’ 11.372 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 1000000 thrpt 200 70.491 â–’ 1.110 ops/s
StreamsJMHAirConcurrentMapTest.testSummingStream java.util.concurrent.ConcurrentHashMap 10000000 thrpt 200 6.195 â–’ 0.034 ops/s

Summary

Java 8 streams is a powerful and terse tool for semi-declarative operations on Maps using a transparent Fork/Join pool for easy parallelism. However, it is important to observe that there are peaks and near zeros in the performance of each mode when used with Maps of various sizes. Also, it is possible for a parallel fork/join operation to be delayed by a tail of ‘straggler tasks’. The client code is not always one-line, and optimizations like .mapToLong(v -> ((Long)v).longValue()) are not always obvious.

AirConcurrentMap uses its own internal, optimized thread pool with an ‘inverted’ fork/join pattern and other optimizations to avoid these problems, and it allows the client to determine the split and merge functionality explicitly for more performance and a different kind of flexibility, allowing stateful tasks.

The reader is encouraged to try the more extensive JMH tests, which represent an easy and fair way to experiment with any of the characteristics of Maps in general and to see the other AirConcurrentMap advantages.

The author at rlderan2@boilerbay.com would be interested in further testing done by the reader.

See boilerbay.com

Posted on

New InfinityDB 4.0 Map interface

There is a new Map-based access path to simplify use of an InfinityDB greatly. The InfinityDBMap and InfinityDBSet classes can wrap a standard database, providing the standard java.util.concurrent.ConcurrentNavigableMap and ..Set interfaces. These standard interfaces are more general than any other standard Map or Set. The java.util.NavigableMap is an improvement over the older java.util.SortedMap.

Use is as simple as this:

Map<String, String> map = new InfinityDBMap<String, String>(db);
map.put("key", "hello world!");
System.out.println("value=" + map.get("key"));
for (String key : map.keySet()) 
  System.out.println("key=" + key);

This is a vast simplification over the ‘engine’ level, which is still available for speed and compatibility. The documentation mostly covers the original API, but the src/com/infinitydb/map/db/MapHelloWorld.java shows the new mode extensively, including trivial through advanced use patterns such as demo JSON and XML printing. An important idea is the ‘nested Map’ mode, in which a new Map is constructed inside another. See the manual also.

Map<String, String> nestedMap = map.getMap("key");
nestedMap.put("sub_key", "hello world!");
System.out.println("nestedMap: " + nestedMap.get("sub_key"));

The entire proprietary source code for the new API is provided, for reference and documentation purposes. These are in src/com/infinitydb/map/**.

Try it out!

 

Posted on

New JMH tests and faster forEach

The popular Java Microbenchmark Harness performance testing tool is supported now with an AirConcurrentMap test specifically for it. The test shows basically the same high performance as the original ones included in the distribution, but now the JMH harness shows the performance in a more familiar and simpler way. This lends further support to our dramatic performance claims. The test also writes a file ‘memoryReport.txt’ which shows the memory efficiency differences per entry for the compared Maps.

Also, the Map.forEach() method is now much faster by being supported in a way that uses the AirConcurrentMap native MapVisitor technique internally and transparently. Now forEach is almost as fast as MapVisitor.  Much existing code will be many times faster without any effort other than changing the Map constructors. The higher performance can be seen in the new JMH tests. The performance increase starts at about 1K Entries and becomes very significant above about 10K entries, finally becoming about 3 to 5 times.

 

 

Posted on Leave a comment

Test AirConcurrentMap vs the Standard Java Maps

Embedded Team

Download AirConcurrentMap and try these tests TwitterDemo.java. You can compare how your Standard Java Maps do vs how AirConcurrentMap does, and decide when it is worth while to use AirConcurrentMap. You will see that AirConcurrentMap offers the fastest iterators, the best speed in concurrent, ordered operations, and the best memory management.