Cassandra Insert流程解析
本文基于Cassandra 3.11版本原生代码,解析Insert流程的处理过程
1)从 datastax 的官方 driver 开始,代码如下:
1 2 3 4 5 6 7 8 | 1 public static void main(String[] args) { 2 String query = "INSERT INTO USER (uid, group_id, nick) VALUES(1, 1, 'ram_test');" ; 3 Cluster cluster = Cluster.builder().addContactPoint( "127.0.0.1" ).build(); 4 Session session = cluster.connect(); 5 session.execute( "USE tp" ); 6 session.execute(query); 7 System.out.println( "query executed" ); 8 } |
2)以上代码是 插入一条数据到 Cassandra 中的示例,看看它是如何一步步写Cassandra中的。调用 session.execute(query) 时,driver 为cql 创建一个 SimpleStatement 对象,然后调用 this.executeAsync(statement) 去执行cql。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | 01 public ResultSetFuture executeAsync( final Statement statement) { 02 if ( this .isInit) { 03 DefaultResultSetFuture future = new DefaultResultSetFuture( this , this .cluster.manager.protocolVersion(), this .makeRequestMessage(statement, (ByteBuffer) null )); 04 ( new RequestHandler( this , future, statement)).sendRequest(); 05 return future; 06 } else { 07 final ChainedResultSetFuture chainedFuture = new ChainedResultSetFuture(); 08 this .initAsync().addListener( new Runnable() { 09 public void run() { 10 DefaultResultSetFuture actualFuture = new DefaultResultSetFuture(SessionManager. this , SessionManager. this .cluster.manager.protocolVersion(), SessionManager. this .makeRequestMessage(statement, (ByteBuffer) null )); 11 SessionManager. this .execute(actualFuture, statement); 12 chainedFuture.setSource(actualFuture); 13 } 14 }, this .executor()); 15 return chainedFuture; 16 } 17 } |
3)关注 makeRequestMessage 方法来了解如何构造请求信息的,根据 Protocol 生产 Request (继承自 Message),通过 tcp 将请求发送至 Cassandra。
下面来看一下Cassandra 是如何接受消息并处理的(tcp 通信使用Netty框架)。
Message.java (package org.apache.cassandra.transport)中的 Dispatcher 类是处理 tcp 请求的入口,ProtocolDecoder类是Message解密器,实例化Message 类;channelRead0() 消息处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | 01 try 02 { 03 assert request.connection() instanceof ServerConnection; 04 connection = (ServerConnection)request.connection(); 05 if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) 06 ClientWarn.instance.captureWarnings(); 07 08 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); 09 10 logger.trace( "Received: {}, v={}" , request, connection.getVersion()); 11 response = request.execute(qstate, queryStartNanoTime); 12 response.setStreamId(request.getStreamId()); 13 response.setWarnings(ClientWarn.instance.getWarnings()); 14 response.attach(connection); 15 connection.applyStateTransition(request.type, response.type); 16 } 17 catch (Throwable t) 18 { 19 JVMStabilityInspector.inspectThrowable(t); 20 UnexpectedChannelExceptionHandler handler = new UnexpectedChannelExceptionHandler(ctx.channel(), true ); 21 flush( new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame())); 22 return ; 23 } 24 finally 25 { 26 ClientWarn.instance.resetWarnings(); 27 } |
4)Request类的execute() 方法为 具体执行 cql 的方法,以本例的 cql 来说,真正的 Message 对象为 QueryMessage 类,下面请移步至 QueryMessage 的 execute方法,跟踪如下调用
1 | Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime); |
5)进入 process 方法中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | 01 public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime) 02 throws RequestExecutionException, RequestValidationException 03 { 04 ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState()); 05 options.prepare(p.boundNames); 06 CQLStatement prepared = p.statement; 07 if (prepared.getBoundTerms() != options.getValues().size()) 08 throw new InvalidRequestException( "Invalid amount of bind variables" ); 09 10 if (!queryState.getClientState().isInternal) 11 metrics.regularStatementsExecuted.inc(); 12 13 return processStatement(prepared, queryState, options, queryStartNanoTime); 14 } |
6)下面先分析 ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
1 2 3 4 5 6 7 8 9 10 11 12 13 | 01 public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) 02 throws RequestValidationException 03 { 04 Tracing.trace( "Parsing {}" , queryStr); 05 ParsedStatement statement = parseStatement(queryStr); 06 07 // Set keyspace for statement that require login 08 if (statement instanceof CFStatement) 09 ((CFStatement)statement).prepareKeyspace(clientState); 10 11 Tracing.trace( "Preparing statement" ); 12 return statement.prepare(clientState); 13 } |
7)先进入QueryProcessor类的parseStatement()方法看cql如何解析,以本例来说,Cql_Parser 类的 cqlStatement() 方法中请看如下代码片段
1 2 3 4 5 6 7 8 9 10 | 01 case 2 : 02 // Parser.g:210:7: st2= insertStatement 03 { 04 pushFollow(FOLLOW_insertStatement_in_cqlStatement88); 05 st2=insertStatement(); 06 state._fsp--; 07 08 stmt = st2; 09 } 10 break ; |
8)进入insertStatement()方法中查看normalInsertStatement(),此方法中两个局部变量
1 2 | 1 List<ColumnDefinition.Raw> columnNames = new ArrayList<>(); 2 List<Term.Raw> values = new ArrayList<>(); |
9)columnNames 存储要插入的列,values 存储需要插入的值,方法返回 new UpdateStatement.ParsedInsert(cf, attrs, columnNames, values, ifNotExists);
记住 UpdateStatement.ParsedInsert 实例,此时需要插入的字段与值保存在这,到此 ParsedStatement 准备完毕。
继续分析 statement.prepare(clientState),查看 prepareInternal() 方法,对于本例来说是 UpdateStatement 类的 prepareInternal() 方法,关注如下代码片段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | 01 for ( int i = 0 ; i < columnNames.size(); i++) 02 { 03 ColumnDefinition def = getColumnDefinition(cfm, columnNames.get(i)); 04 05 if (def.isClusteringColumn()) 06 hasClusteringColumnsSet = true ; 07 08 Term.Raw value = columnValues.get(i); 09 10 if (def.isPrimaryKeyColumn()) 11 { 12 whereClause.add( new SingleColumnRelation(columnNames.get(i), Operator.EQ, value)); 13 } 14 else 15 { 16 Operation operation = new Operation.SetValue(value).prepare(cfm, def); 17 operation.collectMarkerSpecification(boundNames); 18 operations.add(operation); 19 } 20 } |
10)将columnValues 中的值(需要插入的值),转换成 Operation 类,次类中两个重要属性 ColumnDefinition 和 Term
ColumnDefinition 存储更新的列的 meta 信息,Term 存储要更新的值,execute() 方法将 这个两个属性 组成 Cell 存储到 UpdateParameters 中。
Operation 添加到Operations 中,方法最后 return new UpdateStatement(),到此,getStatement() 方法分析完毕。
下面看 processStatement(prepared, queryState, options, queryStartNanoTime) 方法执行流程,跟踪代码到 executeWithoutCondition(queryState, options, queryStartNanoTime)中,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | 01 private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) 02 throws RequestExecutionException, RequestValidationException 03 { 04 ConsistencyLevel cl = options.getConsistency(); 05 if (isCounter()) 06 cl.validateCounterForWrite(cfm); 07 else 08 cl.validateForWrite(cfm.ksName); 09 10 Collection<? extends IMutation> mutations = getMutations(options, false , options.getTimestamp(queryState), queryStartNanoTime); 11 if (!mutations.isEmpty()) 12 StorageProxy.mutateWithTriggers(mutations, cl, false , queryStartNanoTime); 13 14 return null ; 15 } |
11)本方法中,重点关注 getMutations 和 StorageProxy.mutateWithTriggers(),下面继续跟踪 getMutations(),此方法中查看 addUpdates(collector, options, local, now, queryStartNanoTime)。
两个重要调用 PartitionUpdate upd = collector.getPartitionUpdate(cfm, dk, options.getConsistency()) 、addUpdateForKey(upd, clustering, params);
getPartitionUpdate 方法创建 Mutation 并向其中添加 PartitionUpdate;addUpdateForKey 将 UpdateParameters 参数添加到 PartitionUpdate 的 BTree.Builder 中用来构建 Btree,至此突变准备完成,下面将执行更新操作。
跟踪 StorageProxy.mutateWithTriggers(mutations, cl, false, queryStartNanoTime), 进入 StorageProxy.sendToHintedEndpoints(),跟踪
performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler), 此时需要关注 mutation::apply,进入 apply()方法,
跟踪到 KeySpace.applyInternal()
调用 CommitLog.instance.add(mutation); 提交日志
调用ColumnFamilyStore.apply() 写入memtable
- 点赞
- 收藏
- 关注作者
评论(0)