caching - Ignite Write 内部结构

标签 caching ignite gridgain

我正在使用 Ignite 1.7.0 并正在测试 Apache Ignite 的后写功能。提出这个问题的目的是为了更好地了解在 Apache Ignite 中启用 write behind 功能时幕后发生的情况。

我有一个 Ignite 客户端程序,它将 20 个条目插入到测试缓存中(称之为“test_cache”)。

Ignite 服务器在同一台机器上运行,但在不同的 JVM 上。

Ignite 缓存具有以下配置设置:

  1. Read through、Write Through 和 Write behind 已启用。
  2. 齐平大小为 13
  3. 刷新线程数为 1

所有其他属性都设置为默认值。

除此之外还有一个为缓存配置的缓存存储,代码如下:

package com.ignite.genericpoc;

import java.util.Collection;
import java.util.Map;

import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;

public class IgniteStoreTest implements CacheStore<String, String> {

@IgniteInstanceResource
Ignite gridReference;

@CacheNameResource
String cacheName;

@Override
public String load(String key) throws CacheLoaderException {
    System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] ");
    return null;
}

@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {

    IgniteCache<String, String> ic = gridReference.cache(cacheName);

    int currentKeyNo = 0;

    for (String key : keys) {
        ic.put(key, "Value:" + currentKeyNo);
        currentKeyNo++;
    }

    System.out.println("Got " + currentKeyNo + " entries");

    return null;
}

@Override
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException {
    System.out.println("Write method called");
}

@Override
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException {
    System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread "
            + Thread.currentThread().getName());

    System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString());

    try {
        Thread.sleep(60000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

@Override
public void delete(Object key) throws CacheWriterException {
    System.out.println("Delete method called");
}

@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
    System.out.println("Delete All method called");
}

@Override
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException {
    System.out.println("Load cache method called with " + args[0].toString());
}

@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
    System.out.println("Session End called");
}

}

我有意按顺序在 writeAll() 方法中调用了 Thread.sleep() 方法,以模拟缓慢的数据库写入。

将数据加载到缓存中的 Ignite 客户端代码如下:

package com.ignite.genericpoc;

import java.util.ArrayList;
import java.util.List;

import javax.cache.configuration.FactoryBuilder;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;

public class IgnitePersistentStoreClientTest {

public static void main(String[] args) throws InterruptedException {

    List<String> addressess = new ArrayList<>();

    addressess.add("*.*.*.*:47500"); // Hiding the IP 

    Ignition.setClientMode(true);

    Ignite i = IgniteConfigurationUtil.startIgniteServer(
            IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess));

    System.out.println("Client Started");

    CacheConfiguration<String, String> ccfg = new CacheConfiguration<>();

    ccfg.setName("Persistent_Store_Test_Cache");

    ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class));

    ccfg.setReadThrough(true);

    ccfg.setWriteThrough(true);

    ccfg.setWriteBehindEnabled(true);

    ccfg.setWriteBehindFlushSize(13);

    ccfg.setWriteBehindFlushThreadCount(1);

    System.out.println(ccfg.getWriteBehindBatchSize());

    IgniteCache<String, String> ic = i.getOrCreateCache(ccfg);

    System.out.println("Cache Created");

    for (int t = 1; t <= 20; t++) {
        System.out.println("Loading key "+t);
        ic.put("Key:" + t,"Value: "+t);
        System.out.println("Key "+ t + " loaded ");
    }

    System.out.println("Cache Loaded");

    i.close();

}

}

执行过程如下:

  1. 首先启动 Ignite 服务器。

  2. 加载数据的 Ignite 客户端在服务器之后启动。

  3. 因为在 writeAll() 方法上定义了 60 秒的休眠,Ignite 客户端在写入第 20 个条目时卡住了。

  4. 此外,我可以在服务器日志中看到为两个线程调用了 writeAll() 方法,其中 Flush 线程已收到 15 个条目以写入存储,系统线程已收到 1 个条目写信给商店。 Ignite Server 日志如下:

    在线程 flusher-0-#66%test_grid% 中写入为 [ 15 ] 个条目调用的所有方法

    在线程 sys-#22%test_grid% 中写入为 [ 1 ] 条目调用的所有方法

我可以理解 Ignite Client put 在写入 20 条目时卡住了,因为 Write Behind 缓存已满并且所有 Flush 线程也都在忙于写入数据。

以下是我需要弄清楚的几点:

  • 为什么客户端在插入第 20 个条目时被阻塞,它应该在插入第 14 个条目时被阻塞(基于 13 个条目的最大缓存大小)

  • 为什么 Flush 线程只调用了 15 个条目而不是所有 19 个条目,因为我没有设置批处理大小,它默认为 512。

  • 使用 writeAll() 方法调用的系​​统线程是否与处理来自 Ignite 客户端的放置第 20 个条目的请求的线程相同。

  • 考虑到我的缓存已启用后写且写入顺序模式为 PRIMARY_SYNC(默认)并且缓存中没有备份,在主节点能够提交写入之前应阻止对缓存的任何 put 调用.这是否也意味着能够将条目放入 Write Behind 缓存中。

  • 在服务器中存储条目的情况下,Ignite Server 是否会制作条目的两份副本,一份用于存储,一份用于后写缓存。或者是否使用了相同条目的引用。

感谢您耐心看完问题。如果问题太长,我深表歉意,但内容对于向相关听众详细说明情况至关重要。

最佳答案

后写存储在引擎盖下有背压控制。这意味着如果系统无法处理所有异步操作,则可以即时将其转换为同步操作。
如果底层后写缓存的大小超过临界大小(flushSize * 1.5),将使用正在执行写操作的线程而不是flusherThread。
这就是您在日志中看到这些线程的原因:

  • flusher-0-#66%test_grid%(常规冲洗线程)
  • sys-#22%test_grid%(背压控制运行,运行使用客户端线程)

Considering my Cache has write behind enabled and Write Order Mode is PRIMARY_SYNC ( default ) and there are no backups in the cache, any put call to the cache should be blocked until the primary node is able to commit the write. Does this also mean able to put the entry in the Write Behind cache.

是的,确实如此。

In case of storing an entry in the server, does Ignite Server makes two copies of the entry one for storage and one for the write behind cache. Or is the same entry's reference used.

应使用相同条目的引用。

让我们逐步考虑这个场景:

  • 客户端线程已上传 14 个条目。 GridCacheWriteBehindStore 检测到底层缓存中的条目数量超过刷新大小并发送信号以唤醒刷新线程。 请参阅 GridCacheWriteBehindStore#updateCache()

  • flusher 线程唤醒并尝试通过 write-behind-cache.entrySet().iterator() 从后写缓存(即 ConcurrentLinkedHashMap)获取数据。 此迭代器提供弱一致性遍历,即不保证它反射(reflect)构造后的任何修改。 重要的是客户端线程并行放置新条目。

  • 客户端线程将最后一个值放入 [key=Key:20, val=Value: 20]。同时,flusher线程被writeAll()方法中的Thread.sleep()阻塞。 GridCacheWriteBehindStore 检测到当前后写缓存大小超过临界大小(flush size * 1.5),因此应使用背压机制。 GridCacheWriteBehindStore 调用 flushSingleValue() 方法以从后写缓存中刷新最旧的值(当然,此值不应由刷新线程之前获取)。 flushSingleValue() 方法在客户端线程的上下文中被调用。

  • 之后,flusher 线程唤醒并处理剩余的条目。

希望对理解后写存储实现有所帮助。

谢谢!

关于caching - Ignite Write 内部结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47548547/

相关文章:

laravel - 如何在 Laravel 中使用原子锁?

java - Hazelcast NearCache 对性能没有预期的影响

python - 如何在 Django 中设置 Memcached 检索超时

python - 在测试中覆盖 Django 缓存设置

java - Apache 点燃: ScanQuery giving exception

java - 单个 Ignite 节点不会停止 TCP 发现

java - Gridgain:java.lang.OutOfMemoryError:超出 GC 开销限制

ignite - Apache Ignite 缓存中的 SQL 查询

java - 常规和 Spring 抽象的 Ignite 事务失败,并在手动重新连接后出现 IllegalStateException,并将 clientReconnectDisabled 设置为 true

ignite - GridGain Server 分区丢失