Elasticsearch文档并发更新冲突解决方案
1. 乐观锁机制实现
分布式环境中ES的异步并发特性导致文档复制请求乱序到达,可能引发新版本被旧版本覆盖的问题。通过版本号控制可确保操作顺序:
// 连接配置示例
TransportClient esClient = null;
@Before
public void initConnection() {
Settings config = Settings.builder()
.put("cluster.name", "es_cluster").build();
try {
esClient = new PreBuiltTransportClient(config)
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("es-node1"), 9300));
} catch (Exception e) { e.printStackTrace(); }
}
// 并发更新测试
@Test
public void concurrentUpdateTest() throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 8; i++) {
pool.execute(new UpdateWorker());
}
Thread.sleep(15000);
pool.shutdown();
}
class UpdateWorker implements Runnable {
public void run() {
try {
IndexRequest initReq = new IndexRequest("products", "item", "101")
.source(XContentFactory.jsonBuilder()
.startObject()
.field("sku", "P101")
.endObject());
UpdateRequest updateReq = new UpdateRequest("products", "item", "101")
.doc(XContentFactory.jsonBuilder()
.startObject()
.field("thread_id", Thread.currentThread().getId())
.endObject())
.upsert(initReq);
esClient.update(updateReq).get();
} catch (Exception e) { e.printStackTrace(); }
}
}
@After
public void closeClient() { esClient.close(); }
ES通过_version确保操作顺序,当旧版本后到达时自动忽略:
PUT /inventory/items/201
{ "name": "T-shirt", "stock": 50 }
带版本更新:
PUT /inventory/items/201?version=1
{ "name": "T-shirt", "stock": 45 }
版本冲突时将返回409状态码。
2. 外部版本控制
使用version_type=external时,仅当提供版本号大于当前版本才允许更新:
PUT /orders/details/305?version=10&version_type=external
{ "status": "shipped" }
若提供版本号小于当前版本将操作失败。
3. 冲突重试机制
多用户场景下使用retry_on_conflict参数自动重试:
POST /logs/access/1/_update?retry_on_conflict=3
{
"script": "ctx._source.visit_count+=1",
"upsert": { "visit_count": 1 }
}
设置retry_on_conflict=3表示失败时最多重试3次。
4. 悲观锁实践
通过全局锁限制并发:
PUT /system/locks/global/_create
{}
文档级锁使用Groovy脚本:
// lock_script.groovy
if (ctx._source.lock_owner != owner_id) {
assert false
}
ctx.op = 'noop'
共享/排他锁实现:
// shared_lock.groovy
if (ctx._source.lock_mode == 'exclusive') {
assert false
} else {
ctx._source.lock_counter++
}
释放锁:
// unlock_script.groovy
if (ctx._source.lock_mode == "shared") {
ctx._source.lock_counter--
};
if (ctx._source.lock_counter == 0) {
ctx.op = 'delete'
}