HashMap与ConcurrentHashMap:线程之间的传输

问题描述

我有一个关于在多线程应用程序中使用映射的问题。假设我们有这样的情况:

  1. 线程接收List<Map<String,Object>>作为json数据的json数据,由Jackson Json反序列化。
  2. 该线程修改收到的地图。
  3. 然后将列表放入阻止队列中,以供另一个线程使用。

如您所见,map仅由单线程修改,但随后它变成“只读”(无杂项,不再被修改)并传递给另一个线程。接下来,当我研究HasMap(也是TreeMap)和ConcurrentHashMap的实现时,后者具有volatile字段,而前两个字段则没有。那么,在这种情况下我应该使用Map的哪个实现? ConcurrentHashMap是过大的选择,还是由于线程间传输而必须使用

我的简单测试表明,HashMap/TreeMap可以被同步修改并且可以使用,但是我的结论或测试代码可能是错误的:

def map = new TreeMap() // or HashMap
def start = new CountDownLatch(1)
def threads = (1..5)
println("Threads: " + threads)
def created = new CountDownLatch(threads.size())
def completed = new CountDownLatch(threads.size())
threads.each {i ->
    new Thread({
        def from = i * 10
        def to = from + 10
        def local = (from..to)
        println(Thread.currentThread().name + " " + local)
        created.countDown()
        start.await()
        println('Mutating by ' + local)
        local.each {number ->
            synchronized (map) {
                map.put(number,ThreadLocalRandom.current().nextInt())
            }
            println(Thread.currentThread().name + ' added ' + number +  ': ' + map.keySet())
        }
        println 'Done: ' + Thread.currentThread().name
        completed.countDown()
    }).start()
}

created.await()
start.countDown()
completed.await()
println('Completed:')
map.each { e ->
    println('' + e.key + ': ' + e.value)
}

主线程产生5个子线程,当它们完成主线程成功看到子线程的所有更新时,它们会同步更新公共地图。

解决方法

java.util.concurrent类具有special guarantees有关排序的信息:

内存一致性影响:与其他并发集合一样,在将对象放入BlockingQueue happen-before 操作之前,线程中的操作将在访问或删除该元素之后进行BlockingQueue在另一个线程中。

这意味着您可以自由使用任何种类的可变对象并根据需要对其进行操作,然后将其放入队列。检索到该文件后,您已应用的所有操作都将可见。

(通常,请注意,您演示的那种测试只能证明缺乏;在大多数实际情况下,不同步的代码在99%的时间内都能正常工作。最后1%咬你。)

,

这个问题范围很广。

您的原始情况

您说:

[A]映射仅由单线程修改,但随后“变为”只读

棘手的部分是单词“ then”。程序员说“然后”时,您指的是“时钟时间”,例如我已经做到了,现在要做。但是由于各种各样的原因,计算机不会以这种方式“思考”(执行代码)。之前发生的事情以及之后发生的事情都需要手动进行同步,以使计算机以我们看到的方式看到世界。

这就是Java内存模型表达内容的方式:如果希望对象在并发环境中可预测地运行,则必须确保在边界之前建立“偶然发生”。

在Java代码中,在关系建立之前有一些事情要发生。简化一下,仅举几例:

  • 单个线程中的执行顺序(如果语句1和2由同一线程以该顺序执行,则1所做的任何事情始终可以由语句2看到)
  • 当线程t1 start()进入t2时,t2可以看到t1在开始t2之前所做的一切。与join()
  • 相互对应
  • synchronized相同,对象监视:由同步块内的线程执行的每个动作都可由在同一实例上同步的另一个线程看到
  • java.util.concurrent类的任何专用方法都相同。例如,锁和信号量,当然还有集合:如果将元素放入同步集合中,则将其拉出的线程在放入它的线程上会发生事前事件。
  • 如果T2在T1之前发生过,并且T3在T2发生了一次,那么T3在T1也发生了。

回到你的短语

然后它“变成”只读

它确实变成只读的。但是,为了让计算机看到它,您必须赋予“ then”一个含义;即:您必须在代码中放入happen before relationship

稍后再声明:

然后将列表放入阻止队列

一个java.util.concurrent队列?多么整洁!碰巧的是,从并发队列中拉出对象的线程与将所述对象放入队列的线程之间存在与代表关系的“先发生”关系。

您已经建立了代理关系。将对象放入队列的线程(之前)进行的所有更改都可以通过将其拉出而安全地看到。在这种情况下,您不需要ConcurrentHashMap(当然,如果没有其他线程会更改相同的数据)。

您的示例代码

您的示例代码不使用队列。而且它会更改由多个线程修改的单个映射(而不是您的方案所提到的相反方式)。所以,只是...不一样。但是无论哪种方式,您的代码都可以。

访问地图的线程这样做:

synchronized (map) {
    map.put(number,ThreadLocalRandom.current().nextInt())
}

synchornize提供了1)线程互斥和2)之前发生的情况。因此,每个进入同步化的线程都会在另一个也在其上同步化的线程(所有线程)中看到“之前发生的所有事件”。

所以这里没问题。

然后您的主线程执行:

completed.await()
println('Completed:')
map.each { e ->
   println('' + e.key + ': ' + e.value)
}

在这里拯救您的是completed.await()。这将在所有名为countDown()的线程(所有线程)之前建立一个条件。因此,您的主线程将看到工作线程完成的所有操作。一切都很好。

除了...我们经常忘记检查线程的引导程序。工作者第一次在地图实例上同步时,以前没有人做过。我们如何确定他们看到的地图实例已完全初始化并准备就绪。

嗯,有两个原因:

  1. 您可以在调用thread.start()之前初始化地图实例,这会在之前建立一个事件。这足够了
  2. 在工作线程中,您还可以在开始工作之前使用闩锁,然后再建立关系。

您非常安全。