Cassandra Insert流程解析

举报
geminidb_fans 发表于 2020/04/04 22:39:29 2020/04/04
【摘要】 本文基于Cassandra 3.11版本原生代码,解析Insert流程的处理过程1)从 datastax 的官方 driver 开始,代码如下:123456781 public static void main(String[] args) { 2 String query = "INSERT INTO USER (uid, group_id, nick) VAL...

 本文基于Cassandra 3.11版本原生代码,解析Insert流程的处理过程

1)从 datastax 的官方 driver 开始,代码如下:

1
2
3
4
5
6
7
8
1    public static void main(String[] args) {    
2        String query = "INSERT INTO USER (uid, group_id, nick) VALUES(1, 1, 'ram_test');";    
3        Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();    
4        Session session = cluster.connect();    
5        session.execute("USE tp");    
6        session.execute(query);    
7        System.out.println("query executed");    
8    }

2)以上代码是 插入一条数据到 Cassandra 中的示例,看看它是如何一步步写Cassandra中的。调用 session.execute(query) 时,driver 为cql 创建一个 SimpleStatement 对象,然后调用 this.executeAsync(statement) 去执行cql。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
01    public ResultSetFuture executeAsync(final Statement statement) {    
02        if (this.isInit) {    
03            DefaultResultSetFuture future = new DefaultResultSetFuture(thisthis.cluster.manager.protocolVersion(), this.makeRequestMessage(statement, (ByteBuffer)null));    
04            (new RequestHandler(this, future, statement)).sendRequest();    
05            return future;    
06        else {    
07            final ChainedResultSetFuture chainedFuture = new ChainedResultSetFuture();    
08            this.initAsync().addListener(new Runnable() {    
09                public void run() {    
10                    DefaultResultSetFuture actualFuture = new DefaultResultSetFuture(SessionManager.this, SessionManager.this.cluster.manager.protocolVersion(), SessionManager.this.makeRequestMessage(statement, (ByteBuffer)null));    
11                    SessionManager.this.execute(actualFuture, statement);    
12                    chainedFuture.setSource(actualFuture);    
13                }    
14            }, this.executor());    
15            return chainedFuture;    
16        }    
17    }

3)关注 makeRequestMessage 方法来了解如何构造请求信息的,根据 Protocol 生产 Request (继承自 Message),通过 tcp 将请求发送至 Cassandra。

下面来看一下Cassandra 是如何接受消息并处理的(tcp 通信使用Netty框架)。

Message.java (package org.apache.cassandra.transport)中的 Dispatcher 类是处理 tcp 请求的入口,ProtocolDecoder类是Message解密器,实例化Message 类;channelRead0() 消息处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
01    try    
02    {    
03        assert request.connection() instanceof ServerConnection;    
04        connection = (ServerConnection)request.connection();    
05        if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))    
06            ClientWarn.instance.captureWarnings();    
07    
08        QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());    
09    
10        logger.trace("Received: {}, v={}", request, connection.getVersion());    
11        response = request.execute(qstate, queryStartNanoTime);    
12        response.setStreamId(request.getStreamId());    
13        response.setWarnings(ClientWarn.instance.getWarnings());    
14        response.attach(connection);    
15        connection.applyStateTransition(request.type, response.type);    
16    }    
17    catch (Throwable t)    
18    {    
19        JVMStabilityInspector.inspectThrowable(t);    
20        UnexpectedChannelExceptionHandler handler = new UnexpectedChannelExceptionHandler(ctx.channel(), true);    
21        flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame()));    
22        return;    
23    }    
24    finally    
25    {    
26        ClientWarn.instance.resetWarnings();    
27    }

4)Request类的execute() 方法为 具体执行 cql 的方法,以本例的 cql 来说,真正的 Message 对象为 QueryMessage 类,下面请移步至 QueryMessage 的 execute方法,跟踪如下调用


1
Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime);

5)进入 process 方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
01    public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime)    
02        throws RequestExecutionException, RequestValidationException    
03    {    
04        ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());    
05        options.prepare(p.boundNames);    
06        CQLStatement prepared = p.statement;    
07        if (prepared.getBoundTerms() != options.getValues().size())    
08            throw new InvalidRequestException("Invalid amount of bind variables");    
09    
10        if (!queryState.getClientState().isInternal)    
11            metrics.regularStatementsExecuted.inc();    
12    
13        return processStatement(prepared, queryState, options, queryStartNanoTime);    
14    }

6)下面先分析 ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());

1
2
3
4
5
6
7
8
9
10
11
12
13
01    public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)    
02        throws RequestValidationException    
03    {    
04        Tracing.trace("Parsing {}", queryStr);    
05        ParsedStatement statement = parseStatement(queryStr);    
06    
07        // Set keyspace for statement that require login    
08        if (statement instanceof CFStatement)    
09            ((CFStatement)statement).prepareKeyspace(clientState);    
10    
11        Tracing.trace("Preparing statement");    
12        return statement.prepare(clientState);    
13    }

7)先进入QueryProcessor类的parseStatement()方法看cql如何解析,以本例来说,Cql_Parser 类的 cqlStatement() 方法中请看如下代码片段

1
2
3
4
5
6
7
8
9
10
01    case 2 :    
02        // Parser.g:210:7: st2= insertStatement    
03        {    
04        pushFollow(FOLLOW_insertStatement_in_cqlStatement88);    
05        st2=insertStatement();    
06        state._fsp--;    
07    
08         stmt = st2;    
09        }    
10        break;

8)进入insertStatement()方法中查看normalInsertStatement(),此方法中两个局部变量

1
2
1    List<ColumnDefinition.Raw> columnNames  = new ArrayList<>();    
2    List<Term.Raw> values = new ArrayList<>();

9)columnNames 存储要插入的列,values 存储需要插入的值,方法返回 new UpdateStatement.ParsedInsert(cf, attrs, columnNames, values, ifNotExists);

记住 UpdateStatement.ParsedInsert 实例,此时需要插入的字段与值保存在这,到此 ParsedStatement 准备完毕。

继续分析 statement.prepare(clientState),查看 prepareInternal() 方法,对于本例来说是 UpdateStatement 类的 prepareInternal() 方法,关注如下代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
01    for (int i = 0; i < columnNames.size(); i++)    
02    {    
03        ColumnDefinition def = getColumnDefinition(cfm, columnNames.get(i));    
04    
05        if (def.isClusteringColumn())    
06            hasClusteringColumnsSet = true;    
07    
08        Term.Raw value = columnValues.get(i);    
09    
10        if (def.isPrimaryKeyColumn())    
11        {    
12            whereClause.add(new SingleColumnRelation(columnNames.get(i), Operator.EQ, value));    
13        }    
14        else    
15        {    
16            Operation operation = new Operation.SetValue(value).prepare(cfm, def);    
17            operation.collectMarkerSpecification(boundNames);    
18            operations.add(operation);    
19        }    
20    }

10)将columnValues 中的值(需要插入的值),转换成 Operation 类,次类中两个重要属性 ColumnDefinition 和 Term

ColumnDefinition  存储更新的列的 meta 信息,Term 存储要更新的值,execute() 方法将  这个两个属性 组成 Cell 存储到 UpdateParameters 中。

Operation 添加到Operations 中,方法最后 return new UpdateStatement(),到此,getStatement() 方法分析完毕。

下面看 processStatement(prepared, queryState, options, queryStartNanoTime) 方法执行流程,跟踪代码到 executeWithoutCondition(queryState, options, queryStartNanoTime)中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
01    private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime)    
02        throws RequestExecutionException, RequestValidationException    
03    {    
04        ConsistencyLevel cl = options.getConsistency();    
05        if (isCounter())    
06            cl.validateCounterForWrite(cfm);    
07        else    
08            cl.validateForWrite(cfm.ksName);    
09    
10        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), queryStartNanoTime);    
11        if (!mutations.isEmpty())    
12            StorageProxy.mutateWithTriggers(mutations, cl, false, queryStartNanoTime);    
13    
14        return null;    
15    }

11)本方法中,重点关注 getMutations 和 StorageProxy.mutateWithTriggers(),下面继续跟踪 getMutations(),此方法中查看 addUpdates(collector, options, local, now, queryStartNanoTime)。

两个重要调用 PartitionUpdate upd = collector.getPartitionUpdate(cfm, dk, options.getConsistency()) 、addUpdateForKey(upd, clustering, params);

getPartitionUpdate 方法创建 Mutation 并向其中添加 PartitionUpdate;addUpdateForKey 将 UpdateParameters 参数添加到 PartitionUpdate  的 BTree.Builder 中用来构建 Btree,至此突变准备完成,下面将执行更新操作。

跟踪 StorageProxy.mutateWithTriggers(mutations, cl, false, queryStartNanoTime), 进入 StorageProxy.sendToHintedEndpoints(),跟踪

performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler), 此时需要关注 mutation::apply,进入 apply()方法,

跟踪到 KeySpace.applyInternal() 

  • 调用 CommitLog.instance.add(mutation); 提交日志

  • 调用ColumnFamilyStore.apply() 写入memtable


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。