Alluxio 客户端逻辑分析
创建客户端
创建一个 Alluxio 客户端的逻辑在类alluxio.client.file.FileSystem当中,一个最简单的创建客户端访问 Alluxio 的代码如下。
1 | AlluxioProperties properties = new AlluxioProperties(); |
接下来先从 FileSystem.Factory.get这个方法开始分析,该方法代码如下
1 | public static FileSystem get(Subject subject, AlluxioConfiguration conf) { |
只有看到这里的逻辑是先根据传入的用户信息和配置获取一个唯一键,然后会尝试从缓存当中获取。如果在缓存中存在会直接返回相应的客户端,反之会调用FileSystem.Factory.create来创建一个新的文件系统客户端。缓存当中使用了引用计数来判断该客户端是否仍然在使用。当客户端的引用变为 0 时会从缓存中释放该客户端。
客户端初始化
在创建客户端时会调用FileSystemContext.create创建该客户端对象的上下文,在创建过程中需要先调用FileSystemContext.init初始化该客户端。
1 | private synchronized void init(ClientContext clientContext, |
在初始化过程中会先创建客户端与 master 和 worker 连接的连接池,如果启用了alluxio.user.metrics.collection.enabled配置,那么还会启动一个后台守护线程定时与 master 节点进行心跳传输监控指标项信息。
另外在初始化客户端时会创建一个负责重新初始化的后台线程,负责定时从 master 拉取配置文件的哈希值,判断 master 节点的配置是否有过修改(例如通过 updateConf 或者 PathConf 等动态修改配置的命令),如果出现哈希值不一致的情况,那么会重新初始化该客户端。在重新初始化的过程中,会阻塞所有的请求直到重新初始化客户端完成。
创建具有缓存功能的客户端
在客户端初始化完成以后,会调用FileSystem.Factory.create创建客户端,其主要代码如下:
1 | public static FileSystem create(FileSystemContext context) { |
可以看到根据不同的配置,客户端当中出现了三种实现,分别是 BaseFileSystem,MetadataCachingBaseFileSystem和LocalCacheFileSystem。其中 MetadataCachingBaseFileSystem和LocalCacheFileSystem都是对于BaseFileSystem的封装,分别提供了元数据和数据缓存的功能,这里先从最简单的BaseFileSystem开始分析,在 BaseFileSystem 可以将调用分为三大类,一种是 getStatus 这样的纯元数据操作,一种是 openfile 这样的读取文件操作,另一种时 createFile 这样的写入文件操作。针对元数据操作,可以直接调用对应的 GRPC 接口(例如listStatus)即可,接下来需要介绍客户端如何与 Master 进行通信以及读取和写入的流程。
与 Master 节点通信
选择 Master 主节点
客户端通过 MasterInquireClient 这个接口获取到主节点的地址,其目前有三个实现,分别是PollingMasterInquireClient,SingleMasterInquireClient和ZkMasterInquireClient,其中 PollingMasterInquireClient 针对使用 Embbed Journal 作为高可用模式下的选择主节点方式,SingleMasterInquireClient 是单节点给高可用 master 节点的选择方式,ZkMasterInquireClient 用于使用 Zookeeper 进行高可用模式下的主节点选择方式。
因为 Alluxio 当中只有主节点会启动 Rpc Server,而其他节点在遇到客户端连接时会直接断开连接,PollingMasterInquireClient 使用到了这一特性,该实现会轮询所有的 master 节点并尝试连接,直到寻找到可以连接的节点。
在知道了主节点之后,客户端会记录下该节点,之后所有的连接都会直接与主节点连接。如果出现了该主节点无法连接的情况时,就会重新调用 PollingMasterInquireClient 的过程连接到新的主节点。
数据读取流程
数据读取的入口位于 BaseFileSystem.openFile 函数,首先会通过 getStatus 向 master 节点获取该文件的元数据,之后会检查是否为目录和是否未写入完成等条件,如果出现上述情况则抛出异常。
寻找合适的 Worker 节点
通过 getStatus 获取的文件信息中包含了该文件所有块的信息,当客户端读取文件时会根据读取位置的偏移量计算出当前需要读取的块编号,并寻找最接近客户端并持有该块的 Worker 节点,从该节点当中读取该块的数据。
判断最接近客户端的Worker 的逻辑位于 BlockLocationUtils.nearest,首先会判断配置中是否启用了alluxio.worker.data.server.domain.socket.address这一配置,该配置用于判断使用 domain socket 进行短路读时每个 Worker 节点使用的 domain socket 路径是否是不一致的,如关闭该项,那么每个 Worker 节点会使用同样的 domain socket 地址,如果开启该项,则每个 worker 会动态的生成一个路径作为 domain socket 的连接地址。Alluxio 会在这里判断每个worker 节点的domain socket 是否是可连接的,如果找到一个 domain socket 可以连接,那么会直接使用该 Worker 作为连接的 Worker 节点。如果没有根据domain socket 信息寻找到最近的 Worker 节点,之后 Alluxio 会根据每个 Worker 节点配置有限寻找与客户端 IP 相同的 Worker 节点 IP。如果仍然没有找到合适的 Worker 节点,会寻找与配置的机架位置相同的 Worker 节点作为选择的 Worker 节点。
1 | public static Optional<Pair<WorkerNetAddress, Boolean>> nearest(TieredIdentity tieredIdentity, |
如果此时仍然没有合适的 Worker 节点,会根据配置项 alluxio.user.ufs.block.read.location.policy 选择一个 Worker 节点进行读取,目前版本的 Alluxio 提供了以下的策略进行 worker 节点的选择。
| 策略名称 | 介绍 |
|---|---|
| DeterministicHashPolicy | 通过 Hash 确定读取的 Worker |
| LocalFirstAvoidEvictionPolicy | 优先通过本地 Worker 进行读取,但是会尝试避免缓存驱逐 |
| LocalFirstPolicy | 优先通过本地 Worker 进行读取 |
| MostAvailableFirstPolicy | 向拥有最多空间的 Worker 节点进行读取 |
| RoundRobinPolicy | 轮询所有 Worker 节点进行读取 |
| SpecificHostPolicy | 只选择指定的一个 Worker 节点 |
根据不同位置的 Worker 节点,客户端会通过不同的方式进行读取,如果客户端与 Worker 节点在同一个节点上,那么会通过短路读直接从本地文件系统当中读取,否则将会通过与 Worker 节点建立 Grpc 通信读取文件。
短路读
如果客户端和数据块在同一节点上,客户端可以直接从本地磁盘读取数据。通过该方式可以使读取性能得到很大的提高。客户端会通过 BlockInStream.createLocalBlockInStream 创建基于短路读的块输入流。
进行短路读取时需要先与本地 Worker 节点进行一次通信,向 Worker 发送一个OpenLocalBlockRequest请求,用于锁定该块不被驱逐或者移动,如果需要移动该块到 Worker 节点存储的最高层,Worker 节点也会在收到请求后移动该块到最高层。在 Worker 节点处理完这些请求后,会向客户端返回该块在本地文件系统的位置,之后客户端就可以直接从本地读取该块的内容。
当读取数据块完成后或者出现异常终止时,也就是在该连接释放时,Worker 就会自动释放针对该块的写入锁。
通过 Grpc 读取
如果无法通过短路读进行读取,那么客户端会回退到使用 Grpc 连接与选中的 Worker 节点进行通信。这里客户端会判断是否可以通过 domain socket 连接到选中的 Worker 节点并优先选择使用 domain socket 的方式与 Worker 进行通信。创建基于 Grpc 的块输入流的代码位于 BlockInStream.createGrpcBlockInStream。
通过 Grpc 进行连接时,每次读取一个 chunk 的大小,并缓存该 chunk,降低调用 RPC 次数来提升性能。chunk 的大小由配置alluxio.user.network.reader.chunk.size.bytes决定。
读取异常处理
当从一个 Worker 读取失败时,会会记录失败的 Worker 节点,然后尝试从其他的 Worker 当中进行读取,直到重试次数达到上限或者没有别的 Worker 可供读取。重试的配置通过alluxio.user.block.read.retry.sleep.base,alluxio.user.block.read.retry.sleep.max和alluxio.user.block.read.retry.max.duration这三个配置进行控制。
异步缓存
如果读取数据时并没有从本地 Worker 节点进行读取,那么客户端会尝试发起异步缓存的请求。如果启用了 alluxio.user.file.passive.cache.enabled配置,且存在本地的 Worker 节点,那么会向本地Worker 节点发起异步缓存当前读取块的请求,否则会向选出的负责读取该块数据的 Worker 节点发起请求。
1 | boolean triggerAsyncCaching(BlockInStream stream) { |
数据写入流程
客户端写入数据时,会先向 master 节点发送一个 CreateFile 请求,master 在这里会判断请求是否合法等要求,然后向客户端返回新创建的文件的基本信息。
之后客户端会判断写入的类型,进行不同的操作。
如果是 THROUGH 或者 CACHE_THROUGH 等需要直接写入到底层文件系统的写入类型,则会,并选择一个 Worker(根据配置 alluxio.user.block.write.location.policy.class 选择)来负责处理写到 UFS 的数据。
如果写入的类型是 MUST_CACHE、CACHE_THROUGH、ASYNC_THROUGH 等需要将写入的数据缓存到 Worker 节点上的写入类型时,需要打开另外一个流负责将每一个写入的块缓存到不同的 Worker 上。
写入 worker 缓存块的流程与读取的流程类似,也会判断写入的 Worker 节点是否与客户端所在节点相同,如果写入的 Worker 与客户端在同一个主机上,将使用短路写的方式直接将块数据写入到 Worker 节点本地而不需要将块数据通过网络发送到 Worker 节点上。
当数据完成写入时,最后客户端向 master 节点发送一个 completeFile 请求,补全该文件的长度等信息,表示该文件已经完成写入。
异常处理
当在写入文件的过程当中失败时,会调用 cancel 接口取消当前流以及之前的所有使用过的输出流。在取消的过程中,会删除所有已经缓存的块和底层存储当中的数据。与读取流程不同,写入失败后不会进行重试。
零拷贝实现
在写入和读取的流程中,由于 WriteRequest 和 ReadResponse 的消息体积会很大,导致拷贝时间长会严重影响性能。可以通过配置 alluxio.user.streaming.zerocopy.enabled 开启零拷贝特性。Alluxio 通过实现了 Grpc 的 MethodDescriptor.Marshaller 和 Drainable 接口来实现 Grpc 零拷贝的特性。
MethodDescriptor.Marshaller 负责对消息序列化和反序列化的抽象,通过该可以 Alluxio 可以自定义消息序列化和反序列化的行为。Drainable 负责对 java.io.InputStream 进行扩展,增加一个将所有内容转移到 OutputStream 的方法。该方法用于优化输入流的内容最终将被写入 OutputStream 的情况。与其通过 read() 将内容复制到一个字节数组,然后再写入 OutputStream,通过该接口可以实现直接将内容写入 OutputStream,避免数据的拷贝。
1 | public abstract class DataMessageMarshaller<T> implements MethodDescriptor.Marshaller<T>, |
总结
阅读客户端的代码对于后续阅读 Alluxio 的其他代码更很有帮助,可以从大体上了解 Alluxio 的体系结构,明白 Alluxio 在读取和写入数据是的数据流向。