内嵌 HTTP 文件服务

2023/12/24 streamparek zio 共 6341 字,约 19 分钟

用于满足 Flink 资源部署机制 中 Kubernetes Pod Init Container 获取额外 Flink JAR 资源的目的。该服务需要满足以下要求:

轻量化,与当前 StreamPark 实例运行在同个 JVM 中;

独立端口,与 StreamPark-Console 外部服务端口相互独立,该服务仅需要 K8s Cluster 内网访达;

实现

streampark-flink-kubernetes-v2/FileServer.scala 基于 ZIO-HTTP Netty 实现,可以内嵌入任意 Java 程序

集成到 StreamPark-Console 的 SpringBoot 启动流程,在 org.apache.streampark.console.core.runner.EnvInitializer 加入以下代码即可:

ZIOExt.unsafeRun(EmbeddedFileServer.launch());

测试类使用示例

user_fileserver

"Launch embedded http file server and mirror files" in unsafeRun {  
  for {  
    _ <- EmbeddedFileServer.launch  
  
    // mirror local file to http filer server which can be any local path.  
    _ <- FileMirror.mirror(s"$assetPath/flink-faker-0.5.3.jar", "test")  
    _ <- FileMirror.mirror(s"$assetPath/quick-sql-1.0.jar", "test")  
  
    // print the http url that corresponds to the accessible file.  
    _ <- FileMirror.getHttpUrl("test", "flink-faker-0.5.3.jar").debug  
    _ <- FileMirror.getHttpUrl("test", "quick-sql-1.0.jar").debug  
    _ <- ZIO.never  
  } yield ()  
    /* OUTPUT:  
     * http://{LAN_IP}:10030/fs/test/flink-faker-0.5.3.jar      http://{LAN_IP}:10030/fs/test/quick-sql-1.0.jar     
    */  
}  
  
"A more simplified example" in unsafeRun {  
  for {  
    _ <- EmbeddedFileServer.launch  
  
    _ <- FileMirror.mirrorAndGetHttpUrl(s"$assetPath/flink-faker-0.5.3.jar", "test2").debug  
    _ <- FileMirror.mirrorAndGetHttpUrl(s"$assetPath/quick-sql-1.0.jar", "test2").debug    
    _ <- ZIO.never  
  } yield ()  
    /* OUTPUT:  
     * http://{LAN_IP}:10030/fs/test2/flink-faker-0.5.3.jar      http://{LAN_IP}:10030/fs/test2/quick-sql-1.0.jar    
     */  
}

具体代码解析

代码结构

├── fs                                        # 内嵌 HTTP 文件服务
│   ├── EmbeddedFileServer.scala              # 文件服务器路由及初始化方法
│   ├── FileMirror.scala                      # 文件服务器具体实现方法
│   ├── FileSer
verPeerAddress.scala           # 文件服务器路径推断工具类
│   └── package.scala                         # fs包通用函数类

fs package类

package object fs {  
  // 获得嵌入式HTTP文件服务器本地镜像目录
  lazy val localMirrorDir: String = InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR) 
  // 获得嵌入式HTTP文件服务器端口 
  lazy val fileServerPort: Int    = InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_PORT)  
}

EmbeddedFileServer类

对外访问路由

  1. 请求健康检查:
    • 请求方法:GET
    • 路径:/health
    • 示例URL:http:///health
  2. 请求文件服务:
    • 请求方法:GET
    • 路径:/fs/{subspace}/{name}
    • 示例URL:http:////fs/subspace/someFile
      private val routes = Http.collectHttp[Request] {  
        // 请求健康检查
        case Method.GET -> Root / "health"               => Handler.ok.toHttp  
        // 获取镜像文件资源的本地文件
        case Method.GET -> Root / "fs" / subspace / name =>  
       Http.fromFileZIO(FileMirror.getLocalFile(subspace, name))  
      }
      

FileMirror类

获得镜像文件的绝对路径的目录

private val mirrorRoot = os.Path(new File(localMirrorDir).getAbsolutePath)

将文件镜像到本地镜像目录

/** Mirror the file to local mirror directory. Return tuple (namespace, file-name). */  
def mirror(srcFilePath: String, subspace: String): IO[Throwable, (String, String)] = ZIO.attemptBlocking {  
  // 获得源文件的绝对路径
  val srcPath  = os.Path(new File(srcFilePath).getAbsolutePath)  
  // 获得文件名
  val fileName = srcPath.last  
  // 复制文件到镜像文件资源目录下
  os.copy(  
    from = srcPath,  
    to = mirrorRoot / subspace / fileName,  
    replaceExisting = true,  
    createFolders = true,  
    mergeFolders = true  
  )  
  // 返回(namespace, file-name)二元组
  subspace -> fileName  
}

获取镜像文件资源的http访问url

/** Get the http access url of the mirrored file resource. */  
def getHttpUrl(subspace: String, name: String): UIO[String] = {  
  for {  
    httpHost <- FileServerPeerAddress.getEnsure  
    url       = s"http://$httpHost:$fileServerPort/fs/$subspace/$name"  
  } yield url  
}

将文件镜像到本地镜像目录并获取地址

def mirrorAndGetHttpUrl(srcFilePath: String, ns: String): ZIO[Any, Throwable, String] =  
  mirror(srcFilePath, ns)  
    .flatMap { case (ns, name) => getHttpUrl(ns, name) }

获取镜像文件资源目录的本地文件

def getLocalFile(subspace: String, name: String): IO[Throwable, File] = {  
  for {  
    // mirrorRoot是镜像文件的绝对路径的目录 
    // (mirrorRoot / subspace / name).toIO 把镜像文件资源目录的本地文件
    // 变成IO流
    localFile <- ZIO.succeed((mirrorRoot / subspace / name).toIO) 
    // 判断文件是否存在 
    _         <- ZIO  
                   .fail(FileNotFound(localFile.getAbsolutePath))  
                   .whenZIO(ZIO.attempt(localFile.exists()).map(!_))  
    // 判断是否是文件格式               
    _         <- ZIO  
                   .fail(NotAFile(localFile.getAbsolutePath))  
                   .whenZIO(ZIO.attempt(localFile.isFile).map(!_))  
  } yield localFile  
}

FileServerPeerAddress类

初始化时自动计算地址

private val address: Ref[Option[String]] = unsafeRun(Ref.make(None))  // 创建一个Ref,用于存储地址信息

private val STREAMPARK_K8S_SVC_NAME = "streampark-service"  // 定义一个常量,表示streampark服务的名称

// 在初始化时自动计算地址。
infer
  // 将推断出的地址包装在Some中
  .map(Some(_))  
   // 将地址设置到Ref中
  .tap(address.set) 
   // 记录日志,表示K8s对等地址
  .tap(addr => ZIO.logInfo(s"Embedded HTTP file server K8s peer address: ${addr.getOrElse("unknown")}"))  
   // 将整个链式操作作为守护线程运行
  .forkDaemon 
   // 在UIO环境中运行整个操作
  .runUIO  

获取快照地址

/** Get the peer communication address snapshot. */  
def get: UIO[Option[String]] = address.get  

获取地址,阻止调用者,直到计算出地址。

/**
  * 获取地址,阻塞调用者直到地址被计算出来。
  *
  * @return 一个 UIO,包含一个字符串,表示获取到的地址
  */
def getEnsure: UIO[String] = 
// 获取地址,并进行flatMap操作
address.get.flatMap {  
// 如果地址为None,延迟100毫秒后再次尝试获取
  case None       => getEnsure.delay(100.millis)  
  // 如果地址存在,直接返回
  case Some(addr) => ZIO.succeed(addr)  
}

重新获取并刷新通信地址

def refresh: UIO[Unit] = infer.tap(r => address.set(Some(r))).unit

推断k8s资源的相对文件服务地址

/**
  * 推断出Pod内部的信息。
  *
  * @return 一个 UIO,包含一个可选的字符串,表示从Pod内部推断出的信息
  */
private def inferInsidePod: UIO[Option[String]] =  
// 使用Kubernetes客户端
  usingK8sClient { client =>  
    Option(client.getNamespace).flatMap { ns =>  
    // 获取命名空间,并进行flatMap操作
      Option(  
        client.services  
          .inNamespace(ns)  
          // 获取指定名称的服务
          .withName(STREAMPARK_K8S_SVC_NAME)  
          .get()  
      )
      // 如果服务存在,返回服务名称和命名空间的组合
      .map(_ => s"$STREAMPARK_K8S_SVC_NAME.$ns")  
    }  
  }.catchAll(_ => ZIO.succeed(None))  // 如果获取失败,返回None

从k8s Api服务器推断Socket应答

/**
  * 从Kubernetes API服务器推断出套接字回复
  *
  * @return 一个 UIO,包含一个可选的字符串,表示从Kubernetes API服务器推断出的套接字回复
  */
private def inferSocketReplyFromK8sApiServer: UIO[Option[String]] =  
  ZIO  
    .attemptBlocking {  
    // 获取Kubernetes API服务器的主机地址
      val masterUrl = newK8sClient.getConfiguration.getMasterUrl  

      extractHostPortFromUrl(masterUrl).flatMap { case (host, port) =>            // 从主机地址中提取主机和端口
        // 使用新的套接字
        Using(new Socket()) { socket =>  
        // 连接到主机和端口
          socket.connect(new InetSocketAddress(host, port))     
        // 获取本地套接字地址的主机地址                  
         socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress].getAddress.getHostAddress  
        }
        // 将结果转换为Option
        .toOption  
      }  
    }  
    .catchAll(_ => ZIO.succeed(None))  // 如果获取失败,返回None


直接本地主机地址

/**
  * 获取本地主机的IP地址。
  *
  * @return 一个 UIO,包含一个可选的字符串,表示本地主机的IP地址
  */
private def directLocalHost: UIO[Option[String]] =  
  ZIO  
    // 尝试在阻塞上下文中获取本地主机的IP地址
    .attemptBlocking(InetAddress.getLocalHost.getHostAddress)  
    // 如果成功获取,将结果包装在Some中
    .map(Some(_))        
    // 如果获取失败,返回None
    .catchAll(_ => ZIO.succeed(None))  

从Url中提取主机端口

/**
  * 从给定的URL中提取主机和端口。
  *
  * @param url 要提取主机和端口的URL
  * @return 如果成功提取,则包含主机和端口的元组的选项,否则为None
  */
private def extractHostPortFromUrl(url: String): Option[(String, Int)] = {
  // 通过 "://" 分割URL
  val p1 = url.split("://")
  // 如果分割结果不包含两部分,则返回None
  if (p1.length != 2) None
  else {
    // 提取协议部分
    val protocol = p1(0)
    // 通过 "/" 分割剩余部分,然后再通过 ":" 提取主机和端口
    val p2 = p1(1).split("/").head.split(":")
    // 如果找到主机和端口,作为Some返回
    if (p2.length == 2) Some(p2(0) -> p2(1).toInt)
    // 如果只找到主机,根据协议确定默认端口并作为Some返回
    else if (p2.length == 1) protocol match {
      case "http"  => Some(p2(0) -> 80)
      case "https" => Some(p2(0) -> 443)
      case _       => None
    }
    // 如果既没有找到主机也没有找到主机+端口,返回None
    else None
  }
}

文档信息

Search

    Table of Contents