Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.Await;
import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
Expand Down Expand Up @@ -193,6 +195,7 @@
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.DataNode.DataNodeContext;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService;
import org.apache.iotdb.db.service.metrics.FileMetrics;
Expand Down Expand Up @@ -416,6 +419,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface

private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();

private final DataNodeContext dataNodeContext;

private final ExecutorService schemaExecutor =
new WrappedThreadPoolExecutor(
0,
Expand All @@ -430,10 +435,33 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface

private static final String SYSTEM = "system";

public DataNodeInternalRPCServiceImpl() {
public DataNodeInternalRPCServiceImpl(DataNodeContext dataNodeContext) {
super();
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
this.dataNodeContext = dataNodeContext;
}

private long consensusWaitTimeoutSeconds = 30;

private TSStatus waitForConsensusStarted() {
if (dataNodeContext.isAllConsensusStarted()) {
return null;
}
try {
Await.await()
.atMost(consensusWaitTimeoutSeconds, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(dataNodeContext::isAllConsensusStarted);
return null;
} catch (AwaitTimeoutException e) {
LOGGER.warn(
"Consensus has not been started after {} seconds, rejecting region request",
consensusWaitTimeoutSeconds);
return RpcUtils.getStatus(
TSStatusCode.CONSENSUS_NOT_INITIALIZED,
"Consensus has not been started after " + consensusWaitTimeoutSeconds + " seconds");
}
}

@Override
Expand Down Expand Up @@ -624,11 +652,19 @@ private TLoadResp createTLoadResp(final TSStatus resultStatus) {

@Override
public TSStatus createSchemaRegion(final TCreateSchemaRegionReq req) {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
return regionManager.createSchemaRegion(req.getRegionReplicaSet(), req.getStorageGroup());
}

@Override
public TSStatus createDataRegion(TCreateDataRegionReq req) {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
return regionManager.createDataRegion(req.getRegionReplicaSet(), req.getStorageGroup());
}

Expand Down Expand Up @@ -2616,6 +2652,10 @@ public TSStatus updateTemplate(final TUpdateTemplateReq req) {

@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
if (consensusGroupId instanceof DataRegionId) {
Expand Down Expand Up @@ -2644,6 +2684,12 @@ public TRegionLeaderChangeResp changeRegionLeader(TRegionLeaderChangeReq req) {
LOGGER.info("[ChangeRegionLeader] {}", req);
TRegionLeaderChangeResp resp = new TRegionLeaderChangeResp();

TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
resp.setStatus(consensusStatus);
return resp;
}

TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TConsensusGroupId tgId = req.getRegionId();
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
Expand Down Expand Up @@ -2713,6 +2759,10 @@ private boolean isLeader(ConsensusGroupId regionId) {

@Override
public TSStatus createNewRegionPeer(TCreatePeerReq req) {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
List<Peer> peers =
Expand All @@ -2733,6 +2783,10 @@ public TSStatus createNewRegionPeer(TCreatePeerReq req) {

@Override
public TSStatus addRegionPeer(TMaintainPeerReq req) {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
Expand All @@ -2751,6 +2805,10 @@ public TSStatus addRegionPeer(TMaintainPeerReq req) {

@Override
public TSStatus removeRegionPeer(TMaintainPeerReq req) {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
Expand All @@ -2769,6 +2827,10 @@ public TSStatus removeRegionPeer(TMaintainPeerReq req) {

@Override
public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req);
Expand All @@ -2788,6 +2850,10 @@ public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) {
// TODO: return which DataNode fail
@Override
public TSStatus resetPeerList(TResetPeerListReq req) throws TException {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
return RegionMigrateService.getInstance().resetPeerList(req);
}

Expand All @@ -2798,6 +2864,10 @@ public TRegionMigrateResult getRegionMaintainResult(long taskId) throws TExcepti

@Override
public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws TException {
TSStatus consensusStatus = waitForConsensusStarted();
if (consensusStatus != null) {
return consensusStatus;
}
RegionMigrateService.getInstance().notifyRegionMigration(req);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
Expand Down Expand Up @@ -3439,4 +3509,8 @@ private List<ByteBuffer> serializeDatabaseScopedTableList(

return result;
}

public void setConsensusWaitTimeoutSeconds(long consensusWaitTimeoutSeconds) {
this.consensusWaitTimeoutSeconds = consensusWaitTimeoutSeconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,16 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean {
private static final String REGISTER_INTERRUPTION =
"Unexpected interruption when waiting to register to the cluster";

private boolean schemaRegionConsensusStarted = false;
private boolean dataRegionConsensusStarted = false;
private volatile boolean schemaRegionConsensusStarted = false;
private volatile boolean dataRegionConsensusStarted = false;
private static Thread watcherThread;
private DataNodeContext context;

public DataNode() {
super("DataNode");
// We do not init anything here, so that we can re-initialize the instance in IT.
DataNodeHolder.INSTANCE = this;
context = new DataNodeContext();
}

public static void reinitializeStatics() {
Expand Down Expand Up @@ -934,7 +936,9 @@ private void setUpRPCService() throws StartupException {

protected void registerInternalRPCService() throws StartupException {
// Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
registerManager.register(DataNodeInternalRPCService.getInstance());
DataNodeInternalRPCService instance = DataNodeInternalRPCService.getInstance();
instance.setDataNodeContext(context);
registerManager.register(instance);
}

// make it easier for users to extend ClientRPCServiceImpl to export more rpc services
Expand Down Expand Up @@ -1373,4 +1377,10 @@ private DataNodeHolder() {
// Empty constructor
}
}

public class DataNodeContext {
public boolean isAllConsensusStarted() {
return dataRegionConsensusStarted && schemaRegionConsensusStarted;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.thrift.handler.InternalServiceThriftHandler;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
import org.apache.iotdb.db.service.DataNode.DataNodeContext;
import org.apache.iotdb.db.service.metrics.DataNodeInternalRPCServiceMetrics;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService.Processor;
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
Expand All @@ -44,6 +45,7 @@ public class DataNodeInternalRPCService extends ThriftService
private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();

private final AtomicReference<DataNodeInternalRPCServiceImpl> impl = new AtomicReference<>();
private DataNodeContext dataNodeContext;

private DataNodeInternalRPCService() {}

Expand All @@ -54,9 +56,9 @@ public ServiceType getID() {

@Override
public void initTProcessor() {
impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
DataNodeInternalRPCServiceImpl service = getImpl();
initSyncedServiceImpl(null);
processor = new Processor<>(impl.get());
processor = new Processor<>(service);
}

@Override
Expand Down Expand Up @@ -109,7 +111,7 @@ public int getBindPort() {
}

public DataNodeInternalRPCServiceImpl getImpl() {
impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl(dataNodeContext));
return impl.get();
}

Expand All @@ -122,4 +124,8 @@ private DataNodeInternalRPCServiceHolder() {}
public static DataNodeInternalRPCService getInstance() {
return DataNodeInternalRPCServiceHolder.INSTANCE;
}

public void setDataNodeContext(DataNodeContext dataNodeContext) {
this.dataNodeContext = dataNodeContext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.protocol.thrift.impl;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.DataNode.DataNodeContext;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.Collections;

import static org.mockito.Mockito.when;

public class ConsensusWaitTest {

@BeforeClass
public static void setUp() {
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
}

private DataNodeInternalRPCServiceImpl createServiceWithConsensusState(boolean started) {
DataNodeContext context = Mockito.mock(DataNodeContext.class);
when(context.isAllConsensusStarted()).thenReturn(started);
DataNodeInternalRPCServiceImpl service = new DataNodeInternalRPCServiceImpl(context);
service.setConsensusWaitTimeoutSeconds(1);
return service;
}

private TCreateSchemaRegionReq createSchemaRegionReq() {
TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
req.setStorageGroup("root.test");
TRegionReplicaSet replicaSet = new TRegionReplicaSet();
replicaSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0));
TDataNodeLocation location = new TDataNodeLocation();
location.setDataNodeId(0);
location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
replicaSet.setDataNodeLocations(Collections.singletonList(location));
req.setRegionReplicaSet(replicaSet);
return req;
}

private TCreateDataRegionReq createDataRegionReq() {
TCreateDataRegionReq req = new TCreateDataRegionReq();
req.setStorageGroup("root.test");
TRegionReplicaSet replicaSet = new TRegionReplicaSet();
replicaSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
TDataNodeLocation location = new TDataNodeLocation();
location.setDataNodeId(0);
location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
replicaSet.setDataNodeLocations(Collections.singletonList(location));
req.setRegionReplicaSet(replicaSet);
return req;
}

@Test
public void testCreateSchemaRegionRejectsWhenConsensusNotStarted() {
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
TSStatus status = service.createSchemaRegion(createSchemaRegionReq());
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode());
}

@Test
public void testCreateDataRegionRejectsWhenConsensusNotStarted() {
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
TSStatus status = service.createDataRegion(createDataRegionReq());
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode());
}

@Test
public void testDeleteRegionRejectsWhenConsensusNotStarted() {
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
TConsensusGroupId groupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 0);
TSStatus status = service.deleteRegion(groupId);
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode());
}

@Test
public void testChangeRegionLeaderRejectsWhenConsensusNotStarted() {
DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false);
TRegionLeaderChangeReq req = new TRegionLeaderChangeReq();
req.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
TDataNodeLocation newLeader = new TDataNodeLocation();
newLeader.setDataNodeId(0);
newLeader.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
newLeader.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
newLeader.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
req.setNewLeaderNode(newLeader);
TRegionLeaderChangeResp resp = service.changeRegionLeader(req);
Assert.assertEquals(
TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), resp.getStatus().getCode());
}
}
Loading
Loading