diff --git a/agent-flow/src/components/base/jadeNode.jsx b/agent-flow/src/components/base/jadeNode.jsx index 813fc0496e..567e8a6db1 100644 --- a/agent-flow/src/components/base/jadeNode.jsx +++ b/agent-flow/src/components/base/jadeNode.jsx @@ -469,7 +469,7 @@ export const jadeNode = (id, x, y, width, height, parent, drawer) => { * @returns {number} 连接数。 */ self.maxNumToLink = () => { - return 1; + return self.graph?.connectionLimitDisabled ? 100 : 1; }; /** diff --git a/agent-flow/src/components/base/validator.js b/agent-flow/src/components/base/validator.js index 861d4f14e8..3301c7641d 100644 --- a/agent-flow/src/components/base/validator.js +++ b/agent-flow/src/components/base/validator.js @@ -35,7 +35,9 @@ export class NormalNodeConnectorValidator extends Validator { validate() { const nextEvents = this.node.getNextRunnableEvents(); const i18n = this.node.graph.i18n; - if (nextEvents.length !== 1) { + const isConnectionLimitDisabled = Boolean(this.node.graph?.connectionLimitDisabled); + const isValid = isConnectionLimitDisabled ? nextEvents.length >= 1 : nextEvents.length === 1; + if (!isValid) { return Promise.reject({ errorFields: [{ errors: [`${i18n?.t('node') ?? 'node'} ${this.node.text} ${i18n?.t('problemWithConnection') ?? 'problemWithConnection'}`], diff --git a/agent-flow/src/components/code/codeNodeState.jsx b/agent-flow/src/components/code/codeNodeState.jsx index b3dfd55968..7faf063c8a 100644 --- a/agent-flow/src/components/code/codeNodeState.jsx +++ b/agent-flow/src/components/code/codeNodeState.jsx @@ -93,7 +93,7 @@ export const codeNodeState = (id, x, y, width, height, parent, drawer) => { * @override */ self.maxNumToLink = () => { - return 10; + return self.graph?.connectionLimitDisabled ? 100 : 10; }; return self; diff --git a/agent-flow/src/components/variableAggregation/variableAggregationNodeState.jsx b/agent-flow/src/components/variableAggregation/variableAggregationNodeState.jsx index 969ab53261..333b13fb39 100644 --- a/agent-flow/src/components/variableAggregation/variableAggregationNodeState.jsx +++ b/agent-flow/src/components/variableAggregation/variableAggregationNodeState.jsx @@ -49,7 +49,7 @@ export const variableAggregationNodeState = (id, x, y, width, height, parent, dr * @override */ self.maxNumToLink = () => { - return 10; + return self.graph?.connectionLimitDisabled ? 100 : 10; }; return self; diff --git a/agent-flow/src/flow/jadeFlowEntry.jsx b/agent-flow/src/flow/jadeFlowEntry.jsx index 155fa45365..72225011ca 100644 --- a/agent-flow/src/flow/jadeFlowEntry.jsx +++ b/agent-flow/src/flow/jadeFlowEntry.jsx @@ -378,6 +378,10 @@ const jadeFlowAgent = (graph) => { graph.destroy(); }; + self.setConnectionLimitDisabled = (disabled) => { + graph.connectionLimitDisabled = Boolean(disabled); + }; + return self; }; @@ -432,6 +436,7 @@ export const JadeFlow = (() => { div, tenant, appId, + connectionLimitDisabled = false, flowConfigData, configs, i18n, @@ -440,7 +445,7 @@ export const JadeFlow = (() => { }) => { const graphDom = getGraphDom(div); const g = jadeFlowGraph(div, 'jadeFlow'); - await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements); + await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled); g.flowType = flowType; const pageData = g.getPageData(0); await g.editFlow(0, graphDom, pageData.id); @@ -470,8 +475,9 @@ export const JadeFlow = (() => { return jadeFlowAgent(g); }; - const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements) => { + const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled = false) => { g.collaboration.mute = true; + g.connectionLimitDisabled = Boolean(connectionLimitDisabled); g.configs = configs; g.i18n = i18n; for (let i = 0; i < importStatements.length; i++) { diff --git a/agent-flow/src/flow/jadeFlowGraph.js b/agent-flow/src/flow/jadeFlowGraph.js index 4ce925df9f..2b5f88db72 100644 --- a/agent-flow/src/flow/jadeFlowGraph.js +++ b/agent-flow/src/flow/jadeFlowGraph.js @@ -76,6 +76,7 @@ export const jadeFlowGraph = (div, title) => { const self = defaultGraph(div, title); self.type = 'jadeFlowGraph'; self.pageType = 'jadeFlowPage'; + self.connectionLimitDisabled = false; self.enableText = false; self.flowMeta = { exceptionFitables: ['modelengine.fit.jober.aipp.fitable.AippFlowExceptionHandler'], diff --git a/agent-flow/src/flow/jadeFlowPage.js b/agent-flow/src/flow/jadeFlowPage.js index f8397cd8ee..4897358347 100644 --- a/agent-flow/src/flow/jadeFlowPage.js +++ b/agent-flow/src/flow/jadeFlowPage.js @@ -40,6 +40,8 @@ export const jadeFlowPage = (div, graph, name, id) => { self.addEventListener('COPY_SHAPE', shapeChangeListener); self.addEventListener('DELETE_SHAPE', shapeChangeListener); + const isConnectionLimitDisabled = () => Boolean(self.graph?.connectionLimitDisabled); + /** * @override */ @@ -305,7 +307,7 @@ export const jadeFlowPage = (div, graph, name, id) => { */ self.canDragOut = (node, connector) => { const lines = self.getEvents().filter(s => s.fromShape === node.id && s.getDefinedFromConnector() === connector); - return lines && lines.length < 1; + return lines.length < (isConnectionLimitDisabled() ? 10 : 1); }; /** @@ -330,7 +332,9 @@ export const jadeFlowPage = (div, graph, name, id) => { } }; - return jadeEvent.fromShape !== jadeEvent.toShape && isConnectorAllowToLink() && isConnectorWithinLimit(); + return jadeEvent.fromShape !== jadeEvent.toShape + && isConnectorAllowToLink() + && (isConnectionLimitDisabled() || isConnectorWithinLimit()); }; /** diff --git a/app-builder/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/domains/business/RunContext.java b/app-builder/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/domains/business/RunContext.java index 8e7cb4d331..3336eedd66 100644 --- a/app-builder/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/domains/business/RunContext.java +++ b/app-builder/plugins/aipp-plugin/src/main/java/modelengine/fit/jober/aipp/domains/business/RunContext.java @@ -91,6 +91,8 @@ public static RunContext from(CreateAppChatRequest request, OperationContext con .orElseGet(() -> CreateAppChatRequest.Context.builder().build()); RunContext runContext = new RunContext(new HashMap<>(), context); runContext.putAllToBusiness(requestContext.getUserContext()); + +// runContext请求操作 runContext.setUseMemory(requestContext.getUseMemory()); runContext.setDimension(requestContext.getDimension()); runContext.setChatId(request.getChatId()); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java index 52b343fc44..b645a3102c 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java @@ -26,6 +26,7 @@ public final class FlowContext extends IdGenerator { /** * 通过from.offer(data)而不是.offer(context)发起的数据会新增一个trace,这个trace会延续到flow end */ + @Getter private final Set traceId; @@ -265,6 +266,29 @@ public List> generate(List data, String position) { return data.stream().map(d -> this.generate(d, position, LocalDateTime.now())).collect(Collectors.toList()); } + /** + * fork一个新的context用于一拖多分支,继承当前context的运行元数据,但生成新的contextId。 + * + * @return 新的分支context + */ + public FlowContext fork() { + return this.convertData(this.data); + } + + /** + * convertData + * + * @param 转换后的数据类型 + * @param data 转换后的数据 + * @return 转换后的context + */ + public FlowContext convertData(R data) { + FlowContext context = this.copyContext(data); + context.previous = this.id; + context.nextPositionId = this.nextPositionId; + return context; + } + /** * 用于when.convert数据时候的转换context,除了包裹的数据类型不一样,所有其他信息都一样 * @@ -274,12 +298,17 @@ public List> generate(List data, String position) { * @return 转换后的context */ public FlowContext convertData(R data, String id) { + FlowContext context = this.copyContext(data); + context.previous = this.previous; + context.id = id; + return context; + } + + private FlowContext copyContext(R data) { FlowContext context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position, this.parallel, this.parallelMode, LocalDateTime.now()); - context.previous = this.previous; context.status = this.status; context.trans = this.trans; - context.id = id; context.batchId = this.batchId; context.toBatch = this.toBatch; context.createAt = this.createAt; diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java index d49466dfd5..2960a94d80 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java @@ -92,16 +92,16 @@ default void update(List> contexts) { } /** - * updateToSent + * 更新context状态为SENT * - * @param contexts contexts + * @param contexts 上下文列表 */ void updateToSent(List> contexts); /** - * updateToReady + * 更新context状态为READY * - * @param contexts contexts + * @param contexts 上下文列表 */ void updateToReady(List> contexts); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java index 4e8c0fb312..796d3cf56f 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java @@ -175,10 +175,14 @@ public FitStream.Publisher convertToFlow(FlowContextRepo rep nodeMap.values().forEach((fromNode) -> { fromNode.setParentFlow(this); Optional.ofNullable(fromNode.getJober()).ifPresent(jober -> jober.setContextRepo(repo)); + fromNode.getEvents().forEach(event -> { // startNode不能出现在event的to属性, endNode不能出现在event的from属性 + FlowNode toNode = nodeMap.get(event.getTo()); + fromNode.subscribe(streamId, flowEnv, toNode, event); + }); }); return getFlowNode(FlowNodeType.START).getPublisher(streamId, repo, messenger, locks); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java index e0266c8d33..5ec4236134 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java @@ -54,7 +54,7 @@ public class FlowStateNode extends FlowNode { */ @Override public FitStream.Processor getProcessor(String streamId, FlowContextRepo repo, - FlowContextMessenger messenger, FlowLocks locks) { + FlowContextMessenger messenger, FlowLocks locks) { if (!Optional.ofNullable(this.processor).isPresent()) { Node node = new Node<>(streamId, this.metaId, this::stateProduce, repo, messenger, locks, this.type); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java index 8afe281bad..799969cb22 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -322,26 +323,64 @@ public void offer(List> contexts, Consumer // qualifiedWhens表示的与from节点连接的所有事件,条件节点符合条件的事件在这里筛选,在事件上处理需要下发的context java.util.Map, List>> matchedContexts = new LinkedHashMap<>(); Set> matchedContextSet = new HashSet<>(); - qualifiedWhens.forEach( - w -> { - List> afterContexts = contexts - .stream() - .filter(c -> w.getWhether().is(c)) - .peek(c -> c.setNextPositionId(w.getId())) - .collect(Collectors.toList()); - matchedContexts.put(w, afterContexts); - matchedContextSet.addAll(afterContexts); + List> forkedContexts = new ArrayList<>(); + for (FlowContext contextItem : contexts) { + List> matchedSubscriptions = qualifiedWhens.stream() + .filter(w -> w.getWhether().is(contextItem)) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(matchedSubscriptions)) { + continue; + } + matchedContextSet.add(contextItem); + for (int index = 0; index < matchedSubscriptions.size(); index++) { + FitStream.Subscription matchedSubscription = matchedSubscriptions.get(index); + FlowContext branchContext = index == 0 ? contextItem : contextItem.fork(); + branchContext.setNextPositionId(matchedSubscription.getId()); + matchedContexts.computeIfAbsent(matchedSubscription, key -> new ArrayList<>()).add(branchContext); + if (index > 0) { + forkedContexts.add(branchContext); } - ); + } + } + qualifiedWhens.forEach(w -> matchedContexts.computeIfAbsent(w, key -> new ArrayList<>())); List> unMatchedContexts = contexts .stream() .filter(c -> !matchedContextSet.contains(c)) .collect(Collectors.toList()); PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, unMatchedContexts); preSendCallback.accept(callbackInfo); + persistForkedContexts(forkedContexts, matchedContexts); matchedContexts.forEach(FitStream.Subscription::cache); } + private void persistForkedContexts(List> forkedContexts, + java.util.Map, List>> matchedContexts) { + if (CollectionUtils.isEmpty(forkedContexts)) { + return; + } + Set forkedIds = forkedContexts.stream().map(FlowContext::getId).collect(Collectors.toSet()); + List> effectiveForkedContexts = matchedContexts.values() + .stream() + .flatMap(List::stream) + .filter(context -> forkedIds.contains(context.getId())) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(effectiveForkedContexts)) { + return; + } + Set traces = effectiveForkedContexts.stream() + .flatMap(context -> context.getTraceId().stream()) + .collect(Collectors.toSet()); + Lock lock = this.locks.getDistributedLock(this.locks.streamNodeLockKey(this.streamId, this.id, + "ForkContextPool")); + lock.lock(); + try { + this.repo.updateContextPool(effectiveForkedContexts, traces); + this.repo.save(effectiveForkedContexts); + } finally { + lock.unlock(); + } + } + /** * 是否有publisher目标 * 用于stream闭环时将没有subscribed的publisher关闭到close subscriber diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java index c7e783a117..2f0a437ebf 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java @@ -27,6 +27,7 @@ import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Blocks; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Retryable; import modelengine.fit.waterflow.flowsengine.utils.FlowExecutors; +import modelengine.fit.waterflow.flowsengine.utils.FlowUtil; import modelengine.fit.waterflow.flowsengine.utils.PriorityThreadPool; import modelengine.fitframework.log.Logger; import modelengine.fitframework.util.CollectionUtils; @@ -34,13 +35,7 @@ import modelengine.fitframework.util.StringUtils; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -132,6 +127,12 @@ public class To extends IdGenerator implements FitStream.Subscriber private Processors.Validator validator = (i, all) -> true; + private FanInMode fanInMode = FanInMode.ANY; + + private boolean fanInModeConfigured = false; + + private Processors.Map, String> mergeKeyGenerator = this::defaultMergeKey; + private Blocks.Block block = null; private Processors.Filter preFilter = null; @@ -389,6 +390,7 @@ public synchronized void accept(ProcessType type, List> contexts) if (type == ProcessType.PROCESS && (processT == null || !processRunning)) { processRunning = true; String threadName = getThreadName(PROCESS_T_NAME_PREFIX); + processT = new Thread(this::process, threadName); processT.setUncaughtExceptionHandler((tr, ex) -> LOG.error(tr.getName() + " : " + ex.getMessage())); processT.start(); @@ -546,6 +548,13 @@ public Processors.Filter postFilter() { return Optional.ofNullable(this.postFilter).orElseGet(this::defaultFilter); } + private Processors.Filter requestFilter(Processors.Filter fallbackFilter) { + if (!FanInMode.ALL.equals(this.fanInMode)) { + return fallbackFilter; + } + return this::selectReadyMergeGroup; + } + /** * defaultFilter * @@ -567,6 +576,90 @@ public void setValidator(Processors.Validator validator) { } } + public void setFanInMode(FanInMode fanInMode) { + this.fanInMode = Optional.ofNullable(fanInMode).orElse(FanInMode.ANY); + this.fanInModeConfigured = true; + } + + public void setMergeKeyGenerator(Processors.Map, String> mergeKeyGenerator) { + this.mergeKeyGenerator = Optional.ofNullable(mergeKeyGenerator).orElse(this::defaultMergeKey); + } + + private String defaultMergeKey(FlowContext context) { + String rootId = Optional.ofNullable(context.getRootId()).orElse(""); + String transId = Optional.ofNullable(context.getTrans()).map(trans -> trans.getId()).orElse(""); + String traceIds = context.getTraceId().stream().sorted().collect(Collectors.joining(",")); + return StringUtils.join("|", rootId, transId, traceIds); + } + + private String buildMergeKey(FlowContext context) { + try { + String mergeKey = this.mergeKeyGenerator.process(ObjectUtils.cast(context)); + if (StringUtils.isNotEmpty(mergeKey)) { + return mergeKey; + } + } catch (Exception exception) { + LOG.warn("build merge key failed for context: {}", context.getId(), exception); + } + return defaultMergeKey(context); + } + + private List> filterReadyByFanIn(List> candidates) { + if (CollectionUtils.isEmpty(candidates)) { + return Collections.emptyList(); + } + if (FanInMode.ANY.equals(this.fanInMode)) { + return candidates; + } + + long expectedInputs = this.froms.stream().map(Identity::getId).distinct().count(); + if (expectedInputs <= 1) { + return candidates; + } + + Map>> grouped = candidates.stream().collect(Collectors.groupingBy(this::buildMergeKey)); + Set qualifiedMergeKeys = grouped.entrySet() + .stream() + .filter(entry -> entry.getValue() + .stream() + .map(FlowContext::getPosition) + .filter(StringUtils::isNotEmpty) + .distinct() + .count() >= expectedInputs) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + return candidates.stream().filter(context -> qualifiedMergeKeys.contains(buildMergeKey(context))).collect( + Collectors.toList()); + } + + private List> selectReadyMergeGroup(List> candidates) { + if (CollectionUtils.isEmpty(candidates)) { + return Collections.emptyList(); + } + long expectedInputs = this.froms.stream().map(Identity::getId).distinct().count(); + if (expectedInputs <= 1) { + return candidates; + } + Map>> grouped = new LinkedHashMap<>(); + candidates.forEach(context -> grouped.computeIfAbsent(buildMergeKey(context), key -> new ArrayList<>()).add( + context)); + return grouped.values() + .stream() + .filter(group -> group.stream() + .map(FlowContext::getPosition) + .filter(StringUtils::isNotEmpty) + .distinct() + .count() >= expectedInputs) + .findFirst() + .orElseGet(Collections::emptyList); + } + + private List> markReady(List> contexts) { + this.introduceToProcess(contexts); + return contexts.stream().filter(context -> context.getStatus() == FlowNodeStatus.READY).collect( + Collectors.toList()); + } + public ProcessMode getProcessMode() { return this.processMode; } @@ -574,6 +667,10 @@ public ProcessMode getProcessMode() { @Override public void onSubscribe(FitStream.Subscription subscription) { this.froms.add(subscription); // 将该节点的from的event加入 + if (!this.fanInModeConfigured) { + long fromCount = this.froms.stream().map(Identity::getId).distinct().count(); + this.fanInMode = fromCount > 1 ? FanInMode.ALL : FanInMode.ANY; + } } @Override @@ -592,13 +689,14 @@ public void onProcess(List> pre) { this.afterProcess(pre, new ArrayList<>()); return; } + List> processInputs = mergeProcessInputs(pre); if (this.isAsyncJob) { beforeAsyncProcess(pre); - this.getProcessMode().process(this, pre); + this.getProcessMode().process(this, processInputs); return; } logFileTest(this, "before", pre); - List> after = this.getProcessMode().process(this, pre); + List> after = this.getProcessMode().process(this, processInputs); logFileTest(this, "after", pre); if (!isOwnTrace(pre)) { LOG.warn("[AfterProcess] The trace is not belong to this node, traceId={}.", @@ -631,6 +729,48 @@ public void setFailed(List> pre, Exception ex) { Optional.ofNullable(this.globalErrorHandler).ifPresent(handler -> handler.handle(ex, retryable, pre)); } + private List> mergeProcessInputs(List> pre) { + if (!FanInMode.ALL.equals(this.fanInMode) || pre.size() <= 1) { + return pre; + } + if (!(ProcessMode.MAPPING.equals(this.processMode) + || ProcessMode.FLATMAPPING.equals(this.processMode) + || ProcessMode.PRODUCING.equals(this.processMode))) { + return pre; + } + if (pre.stream().anyMatch(context -> !(context.getData() instanceof FlowData))) { + return pre; + } + FlowContext baseContext = pre.get(0); + FlowData mergedFlowData = mergeFlowData(pre); + return Collections.singletonList(baseContext.convertData(ObjectUtils.cast(mergedFlowData), baseContext.getId())); + } + + private FlowData mergeFlowData(List> pre) { + FlowData first = ObjectUtils.cast(pre.get(0).getData()); + Map businessData = new HashMap<>(Optional.ofNullable(first.getBusinessData()).orElseGet(HashMap::new)); + Map contextData = new HashMap<>(Optional.ofNullable(first.getContextData()).orElseGet(HashMap::new)); + Map passData = new HashMap<>(Optional.ofNullable(first.getPassData()).orElseGet(HashMap::new)); + + pre.stream().skip(1).map(FlowContext::getData).map(ObjectUtils::cast).forEach(flowData -> { + businessData.putAll(FlowUtil.mergeMaps(businessData, + Optional.ofNullable(flowData.getBusinessData()).orElseGet(HashMap::new))); + contextData.putAll(FlowUtil.mergeMaps(contextData, + Optional.ofNullable(flowData.getContextData()).orElseGet(HashMap::new))); + passData.putAll(FlowUtil.mergeMaps(passData, + Optional.ofNullable(flowData.getPassData()).orElseGet(HashMap::new))); + }); + return FlowData.builder() + .operator(first.getOperator()) + .startTime(first.getStartTime()) + .businessData(businessData) + .contextData(contextData) + .passData(passData) + .errorMessage(first.getErrorMessage()) + .errorInfo(first.getErrorInfo()) + .build(); + } + private boolean isOwnTrace(List> pre) { return pre.get(0).getTraceId().stream().allMatch(traceId -> { if (!repo.getTraceOwnerService().isOwn(traceId)) { @@ -907,7 +1047,8 @@ private List processData(To to, List> conte @Override protected List> requestAll(To to) { return to.repo.requestProducingContext(to.streamId, - to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.postFilter()); + to.froms.stream().map(Identity::getId).collect(Collectors.toList()), + to.requestFilter(to.postFilter())); } }, REDUCING { @@ -935,7 +1076,8 @@ public List> process(To to, List List> requestAll(To to) { return to.repo.requestMappingContext(to.streamId, - to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.defaultFilter(), + to.froms.stream().map(Identity::getId).collect(Collectors.toList()), + to.requestFilter(to.defaultFilter()), to.validator); } }, @@ -1063,10 +1205,8 @@ private List> requestReady(To to) { * @return List */ private List> filterReady(To to, List> pre) { - to.introduceToProcess(pre); - return pre.stream() - .filter(context -> context.getStatus() == FlowNodeStatus.READY) - .collect(Collectors.toList()); + List> grouped = to.filterReadyByFanIn(pre); + return to.markReady(grouped); } /** @@ -1098,9 +1238,24 @@ private void handleProcessConcurrentConflict(To to) { if (CollectionUtils.isEmpty(pending) || to.inParallelMode(pending)) { return; } + List> ready = filterReady(to, pending); + if (CollectionUtils.isEmpty(ready)) { + return; + } LOG.info("[{}] process thread conflict happens for stream-id: {}, node-id: {}", to.getThreadName(To.PROCESS_T_NAME_PREFIX), to.streamId, to.id); to.accept(ProcessType.PROCESS, pending); } } + + + /* + 多个数据到达后采用的处理方式 + * */ + public enum FanInMode { +// 表示即到即用 + ANY, +// 表示所有数据全部到达之后才可以使用 + ALL + } } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java index 0aa62253e2..161c5e19d1 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java @@ -61,7 +61,7 @@ public class When extends IdGenerator implements FitStream.Subscription When(String streamId, FitStream.Subscriber to, Processors.Map converter, Processors.Whether whether, FlowContextRepo repo, - FlowContextMessenger messenger) { + FlowContextMessenger messenger) { this.streamId = streamId; this.converter = converter == null ? input -> (O) input : converter; this.whether = whether == null ? i -> true : whether; @@ -83,7 +83,7 @@ public When(String streamId, FitStream.Subscriber to, Processors.Map When(String streamId, String eventId, FitStream.Subscriber to, Processors.Map converter, Processors.Whether whether, - FlowContextRepo repo, FlowContextMessenger messenger) { + FlowContextRepo repo, FlowContextMessenger messenger) { this(streamId, to, converter, whether, repo, messenger); this.id = eventId; } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java index a59fe15b3d..8b0df6eba2 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java @@ -163,7 +163,7 @@ protected Node(String streamId, Processors.Map, R> processor, Flo * @param locks 流程锁 * @return From */ - private From initFrom(FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { + protected From initFrom(FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { From from = new From<>(this.getStreamId(), repo, messenger, locks); // node里的from跟随subscriber的streamId from.setId(this.getId()); return from; diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java index 0cf287c7b7..a767b0858b 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java @@ -25,6 +25,6 @@ public class ParallelNodeRule implements NodeRule { */ @Override public void apply(FlowNode flowNode) { - Validation.same(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, exception("parallel node event size")); + Validation.greaterThanOrEquals(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, exception("parallel node event size")); } } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java index 53052941fb..fc4309cbeb 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java @@ -27,7 +27,7 @@ public class StartNodeRule implements NodeRule { */ @Override public void apply(FlowNode flowNode) { - Validation.same(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, + Validation.greaterThanOrEquals(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, () -> new WaterflowParamException(INVALID_START_NODE_EVENT_SIZE)); validateNull(flowNode.getJober(), "start node jober should be null"); validateTriggerMode(flowNode, "start node trigger mode"); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java index 513b418795..ab2c3baf11 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java @@ -27,7 +27,7 @@ public class StateNodeRule implements NodeRule { */ @Override public void apply(FlowNode flowNode) { - Validation.same(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, + Validation.greaterThanOrEquals(flowNode.getEvents().size(), MINIMUM_EVENT_SIZE, () -> new WaterflowParamException(INVALID_STATE_NODE_EVENT_SIZE)); if (!flowNode.getTriggerMode().isAuto()) { Validation.notNull(flowNode.getTask(), exception("Flow node task error")); diff --git a/docker/dev-app-builder.sh b/docker/dev-app-builder.sh index 8e93e5b7f3..41f0fad53f 100644 --- a/docker/dev-app-builder.sh +++ b/docker/dev-app-builder.sh @@ -44,6 +44,7 @@ docker cp "$PLUGINS_DIR"/. app-builder-tmp:/opt/fit-framework/plugins/ echo "Copying shared libraries..." docker cp "$SHARED_DIR"/. app-builder-tmp:/opt/fit-framework/shared/ +docker exec app-builder-tmp bash -c "rm -f /opt/fit-framework/plugins/authentication-oauth2-client-1.0.0-SNAPSHOT.jar" # Commit as development version echo "Committing development version image: ${DEV_VERSION}" docker commit --change='ENTRYPOINT ["/opt/fit-framework/bin/start.sh"]' app-builder-tmp ${REPO}/app-builder:${DEV_VERSION} diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index c6e37e5f19..a3630f7025 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -118,6 +118,7 @@ services: - "./app-platform-tmp/app-builder:/var/share" ports: - "8004:8004" + - "5005:5005" fit-runtime-java: container_name: fit-runtime-java diff --git a/frontend/src/locale/en_US.json b/frontend/src/locale/en_US.json index d0a1876301..d0047ab6b1 100644 --- a/frontend/src/locale/en_US.json +++ b/frontend/src/locale/en_US.json @@ -241,6 +241,8 @@ "plsEnterValidNumber": "Enter a number", "plsEnterNumber": "Enter a number", "debugRun": "Test Running", + "disableConnectionLimit": "Lift Link Limits", + "restoreConnectionLimit": "Restore Link Limits", "startNode": "Start Node", "run": "Run", "publish": "Release", diff --git a/frontend/src/locale/zh_CN.json b/frontend/src/locale/zh_CN.json index 5cf7a5eb1c..11e60112b3 100644 --- a/frontend/src/locale/zh_CN.json +++ b/frontend/src/locale/zh_CN.json @@ -80,6 +80,8 @@ "intelligentCreate": "智能生成", "createWorkflow": "创建工作流", "debugRun": "测试运行", + "disableConnectionLimit": "放开连线限制", + "restoreConnectionLimit": "恢复连线限制", "support": "支持", "character": "字符", "number": "数字", diff --git a/frontend/src/pages/addFlow/components/addflow-header.tsx b/frontend/src/pages/addFlow/components/addflow-header.tsx index 2a161db9ab..7a58cd95b4 100644 --- a/frontend/src/pages/addFlow/components/addflow-header.tsx +++ b/frontend/src/pages/addFlow/components/addflow-header.tsx @@ -37,7 +37,15 @@ import timeImg from '@/assets/images/ai/time.png'; const AddHeader = (props) => { const dispatch = useAppDispatch(); const { t } = useTranslation(); - const { handleDebugClick, workFlow, types, saveTime, updateAippCallBack } = props; + const { + handleDebugClick, + isConnectionLimitDisabled, + workFlow, + types, + saveTime, + toggleConnectionLimitDisabled, + updateAippCallBack, + } = props; const { appInfo, setFlowInfo } = useContext(FlowContext); const [open, setOpen] = useState(false); const [imgPath, setImgPath] = useState(''); @@ -127,6 +135,13 @@ const AddHeader = (props) => { } + } {showElsa &&