详情页标题前

大家有遇到Flink这问题吗?-云小二-阿里云

详情页1

大家有遇到Flink这问题吗?在1.15.4里报错。Caused by: java.util.concurrent.CompletionException: org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)

以下为热心网友提供的参考意见

这个问题是由于Flink作业图中存在循环依赖导致的。在Flink中,作业图是一个有向无环图(DAG),如果作业图中存在循环依赖,那么Flink将无法正确执行作业。

为了解决这个问题,您需要检查您的Flink作业代码,确保作业图中没有循环依赖。您可以使用Flink的拓扑排序功能来检查作业图是否存在循环依赖。以下是一个使用Java API进行拓扑排序的示例:

import org.apache.flink.api.common.JobVertex;
import org.apache.flink.graph.DefaultEdge;
import org.apache.flink.graph.Graph;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class TopologicalSortExample {
    public static void main(String[] args) throws Exception {
        // 创建一个作业图
        Graph<JobVertex, DefaultEdge> graph = new Graph<>();
        Map<JobVertex, Integer> inDegrees = new ConcurrentHashMap<>();

        // 添加作业顶点和边到作业图中
        // ...

        // 计算作业顶点的入度
        for (JobVertex vertex : graph.getVertices()) {
            inDegrees.put(vertex, graph.inDegreeOf(vertex));
        }

        // 拓扑排序
        List<JobVertex> sortedVertices = topologicalSort(graph, inDegrees);

        // 输出排序后的作业顶点
        for (JobVertex vertex : sortedVertices) {
            System.out.println(vertex.getName());
        }
    }

    private static <K, VV, EV> List<K> topologicalSort(Graph<K, EV> graph, Map<K, Integer> inDegrees) {
        List<K> result = new ArrayList<>();
        List<K> zeroInDegreeVertices = new ArrayList<>();

        // 初始化入度为0的顶点列表
        for (K vertex : graph.getVertices()) {
            if (inDegrees.get(vertex) == 0) {
                zeroInDegreeVertices.add(vertex);
            }
        }

        // 执行拓扑排序
        while (!zeroInDegreeVertices.isEmpty()) {
            K currentVertex = zeroInDegreeVertices.remove(0);
            result.add(currentVertex);

            // 更新相邻顶点的入度
            for (K neighbor : graph.getNeighborsOf(currentVertex)) {
                inDegrees.put(neighbor, inDegrees.get(neighbor) - 1);
                if (inDegrees.get(neighbor) == 0) {
                    zeroInDegreeVertices.add(neighbor);
                }
            }
        }

        return result;
    }
}

请根据您的实际作业代码修改上述示例,并确保作业图中没有循环依赖。

转转请注明出处:https://www.yunxiaoer.com/180549.html

(0)
上一篇 2024年1月4日
下一篇 2024年1月4日
详情页2

相关推荐

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。