memcachedクライアントではまった件(後編)

tamuraです。

spymemcachedでバックアップサーバを使わない方法についてまとめました。

結論

失敗時の動作を再接続しないに定義します。

MemcachedClient client = new MemcachedClient(new DefaultConnectionFactory() {
    @Override
    public FailureMode getFailureMode() {
        return Failure.Cancel;
    }
    }, AddrUtil.getAddresses("host1,host2,host3"));

String message = (String)client.get("key");

調査の流れ

失敗時の制御を行っている箇所を探します。 そのため、接続(コネクション?インスタンス?)を複数持っているクラスを探していきます。

MemcachedClient

MemcachedClientMemcachedConnectionを持っています。 MemcachedConnectionはひとつなので、失敗時の制御はMemcachedClientがやっているわけではなさそうです。

public class MemcachedClient extends SpyObject implements MemcachedClientIF,
    ConnectionObserver {

  // 略

  protected final MemcachedConnection mconn;

  // 略
}

MemcachedConnection

MemcachedConnectionからは#getLocator()を使ってMemcachedNodeのリストを取得できています。 ということは、#getLocator()で取得できるクラスに何かありそうです。

  @Override
  public Collection<SocketAddress> getAvailableServers() {
    ArrayList<SocketAddress> rv = new ArrayList<SocketAddress>();
    for (MemcachedNode node : mconn.getLocator().getAll()) {
      if (node.isActive()) {
        rv.add(node.getSocketAddress());
      }
    }
    return rv;
  }
  public NodeLocator getLocator() {
    return locator;
  }

NodeLocator

NodeLocatorで複数のMemcachedNodeを管理しているようです。

/**
 * Interface for locating a node by hash value.
 */
public interface NodeLocator {

  /**
   * Get the primary location for the given key.
   *
   * @param k the object key
   * @return the QueueAttachment containing the primary storage for a key
   */
  MemcachedNode getPrimary(String k);

  /**
   * Get an iterator over the sequence of nodes that make up the backup
   * locations for a given key.
   *
   * @param k the object key
   * @return the sequence of backup nodes.
   */
  Iterator<MemcachedNode> getSequence(String k);

  /**
   * Get all memcached nodes. This is useful for broadcasting messages.
   */
  Collection<MemcachedNode> getAll();

  /**
   * Create a read-only copy of this NodeLocator.
   */
  NodeLocator getReadonlyCopy();

  /**
   * Update locator status.
   *
   * @param nodes New locator nodes.
   */
  void updateLocator(final List<MemcachedNode> nodes);
}

ただこれを見たところ、#GetPrimary(String k)というメソッドはあるのですが、あくまでMemcachedNodeを取得するだけみたいです。 getputなどのオペレーションを渡すインタフェースにはなっていないみたいです。

ふたたびMemcachedConnection

NodeLocator#getPrimary(String k)でそのキーの値を保存するホスト(MemcachedNode)が得られるので、 そのメソッドを呼び出しているところで何らからのオペレーションがされているはずです。

そしてその何らかのオペレーションが失敗したら次のホスト(MemcachedNode)を決定するはずなので、 NodeLocator#getPrimary(String k)を実行している箇所を見てみます。

  /**
   * Add an operation to a connection identified by the given key.
   *
   * If the {@link MemcachedNode} is active or the {@link FailureMode} is set
   * to retry, the primary node will be used for that key. If the primary
   * node is not available and the {@link FailureMode} cancel is used, the
   * operation will be cancelled without further retry.
   *
   * For any other {@link FailureMode} mechanisms (Redistribute), another
   * possible node is used (only if its active as well). If no other active
   * node could be identified, the original primary node is used and retried.
   *
   * @param key the key the operation is operating upon.
   * @param o the operation to add.
   */
  protected void addOperation(final String key, final Operation o) {
    MemcachedNode placeIn = null;
    MemcachedNode primary = locator.getPrimary(key);

    if (primary.isActive() || failureMode == FailureMode.Retry) {
      placeIn = primary;
    } else if (failureMode == FailureMode.Cancel) {
      o.cancel();
    } else {
      Iterator<MemcachedNode> i = locator.getSequence(key);
      while (placeIn == null && i.hasNext()) {
        MemcachedNode n = i.next();
        if (n.isActive()) {
          placeIn = n;
        }
      }

      if (placeIn == null) {
        placeIn = primary;
        this.getLogger().warn("Could not redistribute to another node, "
          + "retrying primary node for %s.", key);
      }
    }

    assert o.isCancelled() || placeIn != null : "No node found for key " + key;
    if (placeIn != null) {
      addOperation(placeIn, o);
    } else {
      assert o.isCancelled() : "No node found for " + key + " (and not "
        + "immediately cancelled)";
    }
  }

failureModeで失敗時の動作を制御しています。 failureModeFilureMode.Cancelのときにキャンセル動作するようです。

failureModefinalなのでコンストラクタで指定しています。

  public MemcachedConnection(final int bufSize, final ConnectionFactory f,
      final List<InetSocketAddress> a, final Collection<ConnectionObserver> obs,
      final FailureMode fm, final OperationFactory opfactory) throws IOException {

    // 略

    failureMode = fm;

    // 略
  }

DefaultConnectionFactory

MemcachedConnectionのインスタンスを作成している箇所はDefaultConnectionFactoryにあります。

  public MemcachedConnection createConnection(List<InetSocketAddress> addrs)
    throws IOException {
    return new MemcachedConnection(getReadBufSize(), this, addrs,
        getInitialObservers(), getFailureMode(), getOperationFactory());
  }

#getFailureMode()FailureModeを取得して渡しているようです。

  public FailureMode getFailureMode() {
    return DEFAULT_FAILURE_MODE;
  }

  public static final FailureMode DEFAULT_FAILURE_MODE =
      FailureMode.Redistribute;

ここでは固定値のFailureMode.Redistributeが指定されています。

ここをキャンセルにするにはDefaultConnectionFactoryを拡張してこのメソッドを上書きすれば大丈夫です。

まとめ

じっさいはこんなにスマートに探せていません。

結論から逆算するとクラス定義やインタフェース定義を見ればどこで何をしようとしていたのかがわかる作りになっていたんだな~と思います。

関連記事

comments powered by Disqus