用于满足 Flink 资源部署机制 中 Kubernetes Pod Init Container 获取额外 Flink JAR 资源的目的。该服务需要满足以下要求:
轻量化,与当前 StreamPark 实例运行在同个 JVM 中;
独立端口,与 StreamPark-Console 外部服务端口相互独立,该服务仅需要 K8s Cluster 内网访达;
实现
streampark-flink-kubernetes-v2/FileServer.scala 基于 ZIO-HTTP Netty 实现,可以内嵌入任意 Java 程序
集成到 StreamPark-Console 的 SpringBoot 启动流程,在 org.apache.streampark.console.core.runner.EnvInitializer 加入以下代码即可:
ZIOExt.unsafeRun(EmbeddedFileServer.launch());
测试类使用示例
"Launch embedded http file server and mirror files" in unsafeRun {
for {
_ <- EmbeddedFileServer.launch
// mirror local file to http filer server which can be any local path.
_ <- FileMirror.mirror(s"$assetPath/flink-faker-0.5.3.jar", "test")
_ <- FileMirror.mirror(s"$assetPath/quick-sql-1.0.jar", "test")
// print the http url that corresponds to the accessible file.
_ <- FileMirror.getHttpUrl("test", "flink-faker-0.5.3.jar").debug
_ <- FileMirror.getHttpUrl("test", "quick-sql-1.0.jar").debug
_ <- ZIO.never
} yield ()
/* OUTPUT:
* http://{LAN_IP}:10030/fs/test/flink-faker-0.5.3.jar http://{LAN_IP}:10030/fs/test/quick-sql-1.0.jar
*/
}
"A more simplified example" in unsafeRun {
for {
_ <- EmbeddedFileServer.launch
_ <- FileMirror.mirrorAndGetHttpUrl(s"$assetPath/flink-faker-0.5.3.jar", "test2").debug
_ <- FileMirror.mirrorAndGetHttpUrl(s"$assetPath/quick-sql-1.0.jar", "test2").debug
_ <- ZIO.never
} yield ()
/* OUTPUT:
* http://{LAN_IP}:10030/fs/test2/flink-faker-0.5.3.jar http://{LAN_IP}:10030/fs/test2/quick-sql-1.0.jar
*/
}
具体代码解析
代码结构
├── fs # 内嵌 HTTP 文件服务
│ ├── EmbeddedFileServer.scala # 文件服务器路由及初始化方法
│ ├── FileMirror.scala # 文件服务器具体实现方法
│ ├── FileSer
verPeerAddress.scala # 文件服务器路径推断工具类
│ └── package.scala # fs包通用函数类
fs package类
package object fs {
// 获得嵌入式HTTP文件服务器本地镜像目录
lazy val localMirrorDir: String = InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR)
// 获得嵌入式HTTP文件服务器端口
lazy val fileServerPort: Int = InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_PORT)
}
EmbeddedFileServer类
对外访问路由
- 请求健康检查:
- 请求方法:GET
- 路径:/health
- 示例URL:http:///health
- 请求文件服务:
- 请求方法:GET
- 路径:/fs/{subspace}/{name}
- 示例URL:http:////fs/subspace/someFile
private val routes = Http.collectHttp[Request] { // 请求健康检查 case Method.GET -> Root / "health" => Handler.ok.toHttp // 获取镜像文件资源的本地文件 case Method.GET -> Root / "fs" / subspace / name => Http.fromFileZIO(FileMirror.getLocalFile(subspace, name)) }
FileMirror类
获得镜像文件的绝对路径的目录
private val mirrorRoot = os.Path(new File(localMirrorDir).getAbsolutePath)
将文件镜像到本地镜像目录
/** Mirror the file to local mirror directory. Return tuple (namespace, file-name). */
def mirror(srcFilePath: String, subspace: String): IO[Throwable, (String, String)] = ZIO.attemptBlocking {
// 获得源文件的绝对路径
val srcPath = os.Path(new File(srcFilePath).getAbsolutePath)
// 获得文件名
val fileName = srcPath.last
// 复制文件到镜像文件资源目录下
os.copy(
from = srcPath,
to = mirrorRoot / subspace / fileName,
replaceExisting = true,
createFolders = true,
mergeFolders = true
)
// 返回(namespace, file-name)二元组
subspace -> fileName
}
获取镜像文件资源的http访问url
/** Get the http access url of the mirrored file resource. */
def getHttpUrl(subspace: String, name: String): UIO[String] = {
for {
httpHost <- FileServerPeerAddress.getEnsure
url = s"http://$httpHost:$fileServerPort/fs/$subspace/$name"
} yield url
}
将文件镜像到本地镜像目录并获取地址
def mirrorAndGetHttpUrl(srcFilePath: String, ns: String): ZIO[Any, Throwable, String] =
mirror(srcFilePath, ns)
.flatMap { case (ns, name) => getHttpUrl(ns, name) }
获取镜像文件资源目录的本地文件
def getLocalFile(subspace: String, name: String): IO[Throwable, File] = {
for {
// mirrorRoot是镜像文件的绝对路径的目录
// (mirrorRoot / subspace / name).toIO 把镜像文件资源目录的本地文件
// 变成IO流
localFile <- ZIO.succeed((mirrorRoot / subspace / name).toIO)
// 判断文件是否存在
_ <- ZIO
.fail(FileNotFound(localFile.getAbsolutePath))
.whenZIO(ZIO.attempt(localFile.exists()).map(!_))
// 判断是否是文件格式
_ <- ZIO
.fail(NotAFile(localFile.getAbsolutePath))
.whenZIO(ZIO.attempt(localFile.isFile).map(!_))
} yield localFile
}
FileServerPeerAddress类
初始化时自动计算地址
private val address: Ref[Option[String]] = unsafeRun(Ref.make(None)) // 创建一个Ref,用于存储地址信息
private val STREAMPARK_K8S_SVC_NAME = "streampark-service" // 定义一个常量,表示streampark服务的名称
// 在初始化时自动计算地址。
infer
// 将推断出的地址包装在Some中
.map(Some(_))
// 将地址设置到Ref中
.tap(address.set)
// 记录日志,表示K8s对等地址
.tap(addr => ZIO.logInfo(s"Embedded HTTP file server K8s peer address: ${addr.getOrElse("unknown")}"))
// 将整个链式操作作为守护线程运行
.forkDaemon
// 在UIO环境中运行整个操作
.runUIO
获取快照地址
/** Get the peer communication address snapshot. */
def get: UIO[Option[String]] = address.get
获取地址,阻止调用者,直到计算出地址。
/**
* 获取地址,阻塞调用者直到地址被计算出来。
*
* @return 一个 UIO,包含一个字符串,表示获取到的地址
*/
def getEnsure: UIO[String] =
// 获取地址,并进行flatMap操作
address.get.flatMap {
// 如果地址为None,延迟100毫秒后再次尝试获取
case None => getEnsure.delay(100.millis)
// 如果地址存在,直接返回
case Some(addr) => ZIO.succeed(addr)
}
重新获取并刷新通信地址
def refresh: UIO[Unit] = infer.tap(r => address.set(Some(r))).unit
推断k8s资源的相对文件服务地址
/**
* 推断出Pod内部的信息。
*
* @return 一个 UIO,包含一个可选的字符串,表示从Pod内部推断出的信息
*/
private def inferInsidePod: UIO[Option[String]] =
// 使用Kubernetes客户端
usingK8sClient { client =>
Option(client.getNamespace).flatMap { ns =>
// 获取命名空间,并进行flatMap操作
Option(
client.services
.inNamespace(ns)
// 获取指定名称的服务
.withName(STREAMPARK_K8S_SVC_NAME)
.get()
)
// 如果服务存在,返回服务名称和命名空间的组合
.map(_ => s"$STREAMPARK_K8S_SVC_NAME.$ns")
}
}.catchAll(_ => ZIO.succeed(None)) // 如果获取失败,返回None
从k8s Api服务器推断Socket应答
/**
* 从Kubernetes API服务器推断出套接字回复
*
* @return 一个 UIO,包含一个可选的字符串,表示从Kubernetes API服务器推断出的套接字回复
*/
private def inferSocketReplyFromK8sApiServer: UIO[Option[String]] =
ZIO
.attemptBlocking {
// 获取Kubernetes API服务器的主机地址
val masterUrl = newK8sClient.getConfiguration.getMasterUrl
extractHostPortFromUrl(masterUrl).flatMap { case (host, port) => // 从主机地址中提取主机和端口
// 使用新的套接字
Using(new Socket()) { socket =>
// 连接到主机和端口
socket.connect(new InetSocketAddress(host, port))
// 获取本地套接字地址的主机地址
socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress].getAddress.getHostAddress
}
// 将结果转换为Option
.toOption
}
}
.catchAll(_ => ZIO.succeed(None)) // 如果获取失败,返回None
直接本地主机地址
/**
* 获取本地主机的IP地址。
*
* @return 一个 UIO,包含一个可选的字符串,表示本地主机的IP地址
*/
private def directLocalHost: UIO[Option[String]] =
ZIO
// 尝试在阻塞上下文中获取本地主机的IP地址
.attemptBlocking(InetAddress.getLocalHost.getHostAddress)
// 如果成功获取,将结果包装在Some中
.map(Some(_))
// 如果获取失败,返回None
.catchAll(_ => ZIO.succeed(None))
从Url中提取主机端口
/**
* 从给定的URL中提取主机和端口。
*
* @param url 要提取主机和端口的URL
* @return 如果成功提取,则包含主机和端口的元组的选项,否则为None
*/
private def extractHostPortFromUrl(url: String): Option[(String, Int)] = {
// 通过 "://" 分割URL
val p1 = url.split("://")
// 如果分割结果不包含两部分,则返回None
if (p1.length != 2) None
else {
// 提取协议部分
val protocol = p1(0)
// 通过 "/" 分割剩余部分,然后再通过 ":" 提取主机和端口
val p2 = p1(1).split("/").head.split(":")
// 如果找到主机和端口,作为Some返回
if (p2.length == 2) Some(p2(0) -> p2(1).toInt)
// 如果只找到主机,根据协议确定默认端口并作为Some返回
else if (p2.length == 1) protocol match {
case "http" => Some(p2(0) -> 80)
case "https" => Some(p2(0) -> 443)
case _ => None
}
// 如果既没有找到主机也没有找到主机+端口,返回None
else None
}
}
文档信息
- 本文作者:Xuxiaotuan
- 本文链接:https://xuyinyin.cn/2023/12/24/%E5%86%85%E5%B5%8C-HTTP-%E6%96%87%E4%BB%B6%E6%9C%8D%E5%8A%A1/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)