Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm attempting to create a ConcurrentHashMap that supports "snapshots" in order to provide consistent iterators, and am wondering if there's a more efficient way to do this. The problem is that if two iterators are created at the same time then they need to read the same values, and the definition of the concurrent hash map's weakly consistent iterators does not guarantee this to be the case. I'd also like to avoid locks if possible: there are several thousand values in the map and processing each item takes several dozen milliseconds, and I don't want to have to block writers during this time as this could result in writers blocking for a minute or longer.

What I have so far:

  1. The ConcurrentHashMap's keys are Strings, and its values are instances of ConcurrentSkipListMap<Long, T>
  2. When an element is added to the hashmap with putIfAbsent, then a new skiplist is allocated, and the object is added via skipList.put(System.nanoTime(), t).
  3. To query the map, I use map.get(key).lastEntry().getValue() to return the most recent value. To query a snapshot (e.g. with an iterator), I use map.get(key).lowerEntry(iteratorTimestamp).getValue(), where iteratorTimestamp is the result of System.nanoTime() called when the iterator was initialized.
  4. If an object is deleted, I use map.get(key).put(timestamp, SnapShotMap.DELETED), where DELETED is a static final object.

Questions:

  1. Is there a library that already implements this? Or barring that, is there a data structure that would be more appropriate than the ConcurrentHashMap and the ConcurrentSkipListMap? My keys are comparable, so maybe some sort of concurrent tree would better support snapshots than a concurrent hash table.
  2. How do I prevent this thing from continually growing? I can delete all of the skip list entries with keys less than X (except for the last key in the map) after all iterators that were initialized on or before X have completed, but I don't know of a good way to determine when this has happened: I can flag that an iterator has completed when its hasNext method returns false, but not all iterators are necessarily going to run to completion; I can keep a WeakReference to an iterator so that I can detect when it's been garbage collected, but I can't think of a good way to detect this other than by using a thread that iterates through the collection of weak references and then sleeps for several minutes - ideally the thread would block on the WeakReference and be notified when the wrapped reference is GC'd, but I don't think this is an option.

    ConcurrentSkipListMap<Long, WeakReference<Iterator>> iteratorMap;
    while(true) {
        long latestGC = 0;
        for(Map.Entry<Long, WeakReference<Iterator>> entry : iteratorMap.entrySet()) {
            if(entry.getValue().get() == null) {
                iteratorMap.remove(entry.getKey());
                latestGC = entry.getKey();
            } else break;
        }
        // remove ConcurrentHashMap entries with timestamps less than `latestGC`
        Thread.sleep(300000); // five minutes
    }
    

Edit: To clear up some confusion in the answers and comments, I'm currently passing weakly consistent iterators to code written by another division in the company, and they have asked me to increase the strength of the iterators' consistency. They are already aware of the fact that it is infeasible for me to make 100% consistent iterators, they just want a best effort on my part. They care more about throughput than iterator consistency, so coarse-grained locks are not an option.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
317 views
Welcome To Ask or Share your Answers For Others

1 Answer

What is your actual use case that requires a special implementation? From the Javadoc of ConcurrentHashMap (emphasis added):

Retrievals reflect the results of the most recently completed update operations holding upon their onset. ... Iterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration. They do not throw ConcurrentModificationException. However, iterators are designed to be used by only one thread at a time.

So the regular ConcurrentHashMap.values().iterator() will give you a "consistent" iterator, but only for one-time use by a single thread. If you need to use the same "snapshot" multiple times and/or by multiple threads, I suggest making a copy of the map.

EDIT: With the new information and the insistence for a "strongly consistent" iterator, I offer this solution. Please note that the use of a ReadWriteLock has the following implications:

  • Writes will be serialized (only one writer at a time) so write performance may be impacted.
  • Concurrent reads are allowed as long as there is no write in progress, so read performance impact should be minimal.
  • Active readers block writers but only as long as it takes to retrieve the reference to the current "snapshot". Once a thread has the snapshot, it no longer blocks writers no matter how long it takes to process the information in the snapshot.
  • Readers are blocked while any write is active; once the write finishes then all readers will have access to the new snapshot until a new write replaces it.

Consistency is achieved by serializing the writes and making a copy of the current values on each and every write. Readers that hold a reference to a "stale" snapshot can continue to use the old snapshot without worrying about modification, and the garbage collector will reclaim old snapshots as soon as no one is using it any more. It is assumed that there is no requirement for a reader to request a snapshot from an earlier point in time.

Because snapshots are potentially shared among multiple concurrent threads, the snapshots are read-only and cannot be modified. This restriction also applies to the remove() method of any Iterator instances created from the snapshot.

import java.util.*;
import java.util.concurrent.locks.*;

public class StackOverflow16600019 <K, V> {
    private final ReadWriteLock locks = new ReentrantReadWriteLock();
    private final HashMap<K,V> map = new HashMap<>();
    private Collection<V> valueSnapshot = Collections.emptyList();

    public V put(K key, V value) {
        locks.writeLock().lock();
        try {
            V oldValue = map.put(key, value);
            updateSnapshot();
            return oldValue;
        } finally {
            locks.writeLock().unlock();
        }
    }

    public V remove(K key) {
        locks.writeLock().lock();
        try {
            V removed = map.remove(key);
            updateSnapshot();
            return removed;
        } finally {
            locks.writeLock().unlock();
        }
    }

    public Collection<V> values() {
        locks.readLock().lock();
        try {
            return valueSnapshot; // read-only!
        } finally {
            locks.readLock().unlock();
        }
    }

    /** Callers MUST hold the WRITE LOCK. */
    private void updateSnapshot() {
        valueSnapshot = Collections.unmodifiableCollection(
            new ArrayList<V>(map.values())); // copy
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...