ceph: separate banner and connect during handshake into distinct stages
authorSage Weil <sage@newdream.net>
Tue, 10 Nov 2009 22:34:36 +0000 (14:34 -0800)
committerSage Weil <sage@newdream.net>
Tue, 10 Nov 2009 22:34:48 +0000 (14:34 -0800)
We need to make sure we only swab the address during the banner once.  So
break process_banner out of process_connect, and clean up the surrounding
code so that these are distinct phases of the handshake.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/messenger.c
fs/ceph/messenger.h

index 5cc374850d8fa5b79e786d94cc9a0e74e386fe27..e389656b2a6b8bd8081d5f29288bfc41425130f6 100644 (file)
@@ -564,10 +564,26 @@ static void prepare_write_keepalive(struct ceph_connection *con)
 /*
  * We connected to a peer and are saying hello.
  */
-static void prepare_write_connect(struct ceph_messenger *msgr,
-                                 struct ceph_connection *con)
+static void prepare_write_banner(struct ceph_messenger *msgr,
+                                struct ceph_connection *con)
 {
        int len = strlen(CEPH_BANNER);
+
+       con->out_kvec[0].iov_base = CEPH_BANNER;
+       con->out_kvec[0].iov_len = len;
+       con->out_kvec[1].iov_base = &msgr->my_enc_addr;
+       con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
+       con->out_kvec_left = 2;
+       con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
+       con->out_kvec_cur = con->out_kvec;
+       con->out_more = 0;
+       set_bit(WRITE_PENDING, &con->state);
+}
+
+static void prepare_write_connect(struct ceph_messenger *msgr,
+                                 struct ceph_connection *con,
+                                 int after_banner)
+{
        unsigned global_seq = get_global_seq(con->msgr, 0);
        int proto;
 
@@ -595,32 +611,14 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        if (test_bit(LOSSYTX, &con->state))
                con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
 
-       con->out_kvec[0].iov_base = CEPH_BANNER;
-       con->out_kvec[0].iov_len = len;
-       con->out_kvec[1].iov_base = &msgr->my_enc_addr;
-       con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
-       con->out_kvec[2].iov_base = &con->out_connect;
-       con->out_kvec[2].iov_len = sizeof(con->out_connect);
-       con->out_kvec_left = 3;
-       con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr) +
-               sizeof(con->out_connect);
-       con->out_kvec_cur = con->out_kvec;
-       con->out_more = 0;
-       set_bit(WRITE_PENDING, &con->state);
-}
-
-static void prepare_write_connect_retry(struct ceph_messenger *msgr,
-                                       struct ceph_connection *con)
-{
-       dout("prepare_write_connect_retry %p\n", con);
-       con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
-       con->out_connect.global_seq =
-               cpu_to_le32(get_global_seq(con->msgr, 0));
-
-       con->out_kvec[0].iov_base = &con->out_connect;
-       con->out_kvec[0].iov_len = sizeof(con->out_connect);
-       con->out_kvec_left = 1;
-       con->out_kvec_bytes = sizeof(con->out_connect);
+       if (!after_banner) {
+               con->out_kvec_left = 0;
+               con->out_kvec_bytes = 0;
+       }
+       con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
+       con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
+       con->out_kvec_left++;
+       con->out_kvec_bytes += sizeof(con->out_connect);
        con->out_kvec_cur = con->out_kvec;
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
@@ -778,6 +776,12 @@ out:
 /*
  * Prepare to read connection handshake, or an ack.
  */
+static void prepare_read_banner(struct ceph_connection *con)
+{
+       dout("prepare_read_banner %p\n", con);
+       con->in_base_pos = 0;
+}
+
 static void prepare_read_connect(struct ceph_connection *con)
 {
        dout("prepare_read_connect %p\n", con);
@@ -829,11 +833,11 @@ static int read_partial(struct ceph_connection *con,
 /*
  * Read all or part of the connect-side handshake on a new connection
  */
-static int read_partial_connect(struct ceph_connection *con)
+static int read_partial_banner(struct ceph_connection *con)
 {
        int ret, to = 0;
 
-       dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
+       dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
 
        /* peer's banner */
        ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
@@ -847,6 +851,16 @@ static int read_partial_connect(struct ceph_connection *con)
                           &con->peer_addr_for_me);
        if (ret <= 0)
                goto out;
+out:
+       return ret;
+}
+
+static int read_partial_connect(struct ceph_connection *con)
+{
+       int ret, to = 0;
+
+       dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
+
        ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
        if (ret <= 0)
                goto out;
@@ -856,6 +870,7 @@ static int read_partial_connect(struct ceph_connection *con)
             le32_to_cpu(con->in_reply.global_seq));
 out:
        return ret;
+
 }
 
 /*
@@ -976,9 +991,9 @@ bad:
        return -EINVAL;
 }
 
-static int process_connect(struct ceph_connection *con)
+static int process_banner(struct ceph_connection *con)
 {
-       dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
+       dout("process_banner on %p\n", con);
 
        if (verify_hello(con) < 0)
                return -1;
@@ -1016,10 +1031,19 @@ static int process_connect(struct ceph_connection *con)
                       sizeof(con->peer_addr_for_me.in_addr));
                addr_set_port(&con->msgr->inst.addr.in_addr, port);
                encode_my_addr(con->msgr);
-               dout("process_connect learned my addr is %s\n",
+               dout("process_banner learned my addr is %s\n",
                     pr_addr(&con->msgr->inst.addr.in_addr));
        }
 
+       set_bit(NEGOTIATING, &con->state);
+       prepare_read_connect(con);
+       return 0;
+}
+
+static int process_connect(struct ceph_connection *con)
+{
+       dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
+
        switch (con->in_reply.tag) {
        case CEPH_MSGR_TAG_BADPROTOVER:
                dout("process_connect got BADPROTOVER my %d != their %d\n",
@@ -1053,7 +1077,7 @@ static int process_connect(struct ceph_connection *con)
                       ENTITY_NAME(con->peer_name),
                       pr_addr(&con->peer_addr.in_addr));
                reset_connection(con);
-               prepare_write_connect_retry(con->msgr, con);
+               prepare_write_connect(con->msgr, con, 0);
                prepare_read_connect(con);
 
                /* Tell ceph about it. */
@@ -1071,7 +1095,7 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->out_connect.connect_seq),
                     le32_to_cpu(con->in_connect.connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
-               prepare_write_connect_retry(con->msgr, con);
+               prepare_write_connect(con->msgr, con, 0);
                prepare_read_connect(con);
                break;
 
@@ -1080,19 +1104,17 @@ static int process_connect(struct ceph_connection *con)
                 * If we sent a smaller global_seq than the peer has, try
                 * again with a larger value.
                 */
-               dout("process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
+               dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
                     con->peer_global_seq,
                     le32_to_cpu(con->in_connect.global_seq));
                get_global_seq(con->msgr,
                               le32_to_cpu(con->in_connect.global_seq));
-               prepare_write_connect_retry(con->msgr, con);
+               prepare_write_connect(con->msgr, con, 0);
                prepare_read_connect(con);
                break;
 
        case CEPH_MSGR_TAG_READY:
                clear_bit(CONNECTING, &con->state);
-               if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
-                       set_bit(LOSSYRX, &con->state);
                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
                con->connect_seq++;
                dout("process_connect got READY gseq %d cseq %d (%d)\n",
@@ -1420,9 +1442,11 @@ more:
                if (test_and_clear_bit(STANDBY, &con->state))
                        con->connect_seq++;
 
-               prepare_write_connect(msgr, con);
-               prepare_read_connect(con);
+               prepare_write_banner(msgr, con);
+               prepare_write_connect(msgr, con, 1);
+               prepare_read_banner(con);
                set_bit(CONNECTING, &con->state);
+               clear_bit(NEGOTIATING, &con->state);
 
                con->in_tag = CEPH_MSGR_TAG_READY;
                dout("try_write initiating connect on %p new state %lu\n",
@@ -1521,7 +1545,16 @@ more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
             con->in_base_pos);
        if (test_bit(CONNECTING, &con->state)) {
-               dout("try_read connecting\n");
+               if (!test_bit(NEGOTIATING, &con->state)) {
+                       dout("try_read connecting\n");
+                       ret = read_partial_banner(con);
+                       if (ret <= 0)
+                               goto done;
+                       if (process_banner(con) < 0) {
+                               ret = -1;
+                               goto out;
+                       }
+               }
                ret = read_partial_connect(con);
                if (ret <= 0)
                        goto done;
index e016fa7cf970ae9e3b9e0410e2b76a7212ef5952..80f7e1e94448b1ba79059e6743983a0d4ea09550 100644 (file)
@@ -104,8 +104,8 @@ struct ceph_msg_pos {
  * thread is currently opening, reading or writing data to the socket.
  */
 #define LOSSYTX         0  /* we can close channel or drop messages on errors */
-#define LOSSYRX         1  /* peer may reset/drop messages */
-#define CONNECTING     2
+#define CONNECTING     1
+#define NEGOTIATING    2
 #define KEEPALIVE_PENDING      3
 #define WRITE_PENDING  4  /* we have data ready to send */
 #define QUEUED          5  /* there is work queued on this connection */