mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-29 21:17:05 +02:00
fix(active-memory): align recall timeout with hook runner
Fixes #72606.
This commit is contained in:
@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
|
||||
- fix(security): block npm_execpath injection from workspace .env [AI-assisted]. (#73262) Thanks @pgondhi987.
|
||||
- Tools/web_fetch: decode response bodies from raw bytes using declared HTTP, XML, or HTML meta charsets before extraction, so Shift_JIS and other legacy-charset pages no longer return mojibake. Fixes #72916. Thanks @amknight.
|
||||
- Active Memory: skip payload-less `memory_search` transcript tool results when building debug telemetry, so newer empty entries no longer hide the latest useful debug payload. (#68773) Thanks @SimbaKingjoe.
|
||||
- Active Memory: keep recall setup time from consuming the configured model timeout while giving the hook runner an explicit bounded budget for the plugin, so slow embedded-run setup no longer causes immediate recall timeouts. Fixes #72606. (#72620) Thanks @hyspacex.
|
||||
- Channels/Discord: bound message read/search REST calls, route those actions through Gateway execution, and fall back to `CommandTargetSessionKey` for inbound hook session keys so Discord reads do not hang and hooks still fire when `SessionKey` is empty. Fixes #73431. (#73521) Thanks @amknight.
|
||||
- Plugins/media: auto-enable provider plugins referenced by `agents.defaults.imageGenerationModel`, `videoGenerationModel`, and `musicGenerationModel` primary/fallback refs, so configured Google and MiniMax media providers do not stay disabled behind a restrictive plugin allowlist. Thanks @vincentkoc.
|
||||
- Memory-core/dreaming: retry managed dreaming cron registration after startup when the cron service is not reachable yet, so the scheduled Memory Dreaming Promotion sweep recovers without waiting for heartbeat traffic. Fixes #72841. Thanks @amknight.
|
||||
|
||||
@@ -38,6 +38,7 @@ vi.mock("openclaw/plugin-sdk/session-store-runtime", async () => {
|
||||
|
||||
describe("active-memory plugin", () => {
|
||||
const hooks: Record<string, Function> = {};
|
||||
const hookOptions: Record<string, Record<string, unknown> | undefined> = {};
|
||||
const registeredCommands: Record<string, any> = {};
|
||||
const runEmbeddedPiAgent = vi.fn();
|
||||
let stateDir = "";
|
||||
@@ -105,8 +106,9 @@ describe("active-memory plugin", () => {
|
||||
registerCommand: vi.fn((command) => {
|
||||
registeredCommands[command.name] = command;
|
||||
}),
|
||||
on: vi.fn((hookName: string, handler: Function) => {
|
||||
on: vi.fn((hookName: string, handler: Function, opts?: Record<string, unknown>) => {
|
||||
hooks[hookName] = handler;
|
||||
hookOptions[hookName] = opts;
|
||||
}),
|
||||
};
|
||||
const getActiveMemoryLines = (sessionKey: string): string[] => {
|
||||
@@ -159,6 +161,9 @@ describe("active-memory plugin", () => {
|
||||
for (const key of Object.keys(hooks)) {
|
||||
delete hooks[key];
|
||||
}
|
||||
for (const key of Object.keys(hookOptions)) {
|
||||
delete hookOptions[key];
|
||||
}
|
||||
for (const key of Object.keys(registeredCommands)) {
|
||||
delete registeredCommands[key];
|
||||
}
|
||||
@@ -179,7 +184,10 @@ describe("active-memory plugin", () => {
|
||||
});
|
||||
|
||||
it("registers a before_prompt_build hook", () => {
|
||||
expect(api.on).toHaveBeenCalledWith("before_prompt_build", expect.any(Function));
|
||||
expect(api.on).toHaveBeenCalledWith("before_prompt_build", expect.any(Function), {
|
||||
timeoutMs: 150_000,
|
||||
});
|
||||
expect(hookOptions.before_prompt_build?.timeoutMs).toBe(150_000);
|
||||
});
|
||||
|
||||
it("runs recall without recording shared auth-profile failures", async () => {
|
||||
@@ -567,7 +575,7 @@ describe("active-memory plugin", () => {
|
||||
agents: ["main"],
|
||||
allowedChatTypes: ["explicit"],
|
||||
};
|
||||
await plugin.register(api as unknown as OpenClawPluginApi);
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what should i work on next?", messages: [] },
|
||||
@@ -591,7 +599,7 @@ describe("active-memory plugin", () => {
|
||||
agents: ["main"],
|
||||
allowedChatTypes: ["explicit"],
|
||||
};
|
||||
await plugin.register(api as unknown as OpenClawPluginApi);
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what should i work on next?", messages: [] },
|
||||
@@ -2012,6 +2020,7 @@ describe("active-memory plugin", () => {
|
||||
|
||||
it("does not cache timeout results", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: 1,
|
||||
@@ -2096,6 +2105,7 @@ describe("active-memory plugin", () => {
|
||||
|
||||
it("ignores late subagent payloads once the active-memory timeout signal has fired", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: 1,
|
||||
@@ -2134,10 +2144,44 @@ describe("active-memory plugin", () => {
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("does not spend the model timeout budget on active-memory subagent setup", async () => {
|
||||
const CONFIGURED_TIMEOUT_MS = 10;
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(100);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: CONFIGURED_TIMEOUT_MS,
|
||||
logging: true,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, CONFIGURED_TIMEOUT_MS + 30));
|
||||
return { payloads: [{ text: "remember the ramen place" }] };
|
||||
});
|
||||
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what wings should i order? setup grace", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:setup-grace",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result?.prependContext).toContain("remember the ramen place");
|
||||
expect(runEmbeddedPiAgent.mock.calls.at(-1)?.[0]?.timeoutMs).toBe(CONFIGURED_TIMEOUT_MS);
|
||||
const infoLines = vi
|
||||
.mocked(api.logger.info)
|
||||
.mock.calls.map((call: unknown[]) => String(call[0]));
|
||||
expect(infoLines.some((line: string) => line.includes("status=timeout"))).toBe(false);
|
||||
});
|
||||
|
||||
it("returns timeout within a hard deadline even when the subagent never checks the abort signal", async () => {
|
||||
const CONFIGURED_TIMEOUT_MS = 200;
|
||||
const MARGIN_MS = 500;
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: CONFIGURED_TIMEOUT_MS,
|
||||
|
||||
@@ -35,6 +35,7 @@ const DEFAULT_CACHE_TTL_MS = 15_000;
|
||||
const DEFAULT_MAX_CACHE_ENTRIES = 1000;
|
||||
const CACHE_SWEEP_INTERVAL_MS = 1000;
|
||||
const DEFAULT_MIN_TIMEOUT_MS = 250;
|
||||
const DEFAULT_SETUP_GRACE_TIMEOUT_MS = 30_000;
|
||||
const DEFAULT_QUERY_MODE = "recent" as const;
|
||||
const DEFAULT_QMD_SEARCH_MODE = "search" as const;
|
||||
const DEFAULT_TRANSCRIPT_DIR = "active-memory";
|
||||
@@ -216,6 +217,7 @@ type AsyncLock = <T>(task: () => Promise<T>) => Promise<T>;
|
||||
const toggleStoreLocks = new Map<string, AsyncLock>();
|
||||
let lastActiveRecallCacheSweepAt = 0;
|
||||
let minimumTimeoutMs = DEFAULT_MIN_TIMEOUT_MS;
|
||||
let setupGraceTimeoutMs = DEFAULT_SETUP_GRACE_TIMEOUT_MS;
|
||||
|
||||
function createAsyncLock(): AsyncLock {
|
||||
let lock: Promise<void> = Promise.resolve();
|
||||
@@ -2188,9 +2190,10 @@ async function maybeResolveActiveRecall(params: {
|
||||
const controller = new AbortController();
|
||||
const TIMEOUT_SENTINEL = Symbol("timeout");
|
||||
let sessionFile: string | undefined;
|
||||
const watchdogTimeoutMs = params.config.timeoutMs + setupGraceTimeoutMs;
|
||||
const timeoutId = setTimeout(() => {
|
||||
controller.abort(new Error(`active-memory timeout after ${params.config.timeoutMs}ms`));
|
||||
}, params.config.timeoutMs);
|
||||
controller.abort(new Error(`active-memory timeout after ${watchdogTimeoutMs}ms`));
|
||||
}, watchdogTimeoutMs);
|
||||
timeoutId.unref?.();
|
||||
|
||||
const timeoutPromise = new Promise<typeof TIMEOUT_SENTINEL>((resolve) => {
|
||||
@@ -2428,109 +2431,114 @@ export default definePluginEntry({
|
||||
},
|
||||
});
|
||||
|
||||
api.on("before_prompt_build", async (event, ctx) => {
|
||||
try {
|
||||
refreshLiveConfigFromRuntime();
|
||||
const resolvedAgentId = resolveStatusUpdateAgentId(ctx);
|
||||
const resolvedSessionKey =
|
||||
ctx.sessionKey?.trim() ||
|
||||
(resolvedAgentId
|
||||
? resolveCanonicalSessionKeyFromSessionId({
|
||||
api,
|
||||
agentId: resolvedAgentId,
|
||||
sessionId: ctx.sessionId,
|
||||
})
|
||||
: undefined);
|
||||
const effectiveAgentId =
|
||||
resolvedAgentId || resolveStatusUpdateAgentId({ sessionKey: resolvedSessionKey });
|
||||
if (await isSessionActiveMemoryDisabled({ api, sessionKey: resolvedSessionKey })) {
|
||||
await persistPluginStatusLines({
|
||||
const beforePromptBuildTimeoutMs = 120_000 + setupGraceTimeoutMs;
|
||||
api.on(
|
||||
"before_prompt_build",
|
||||
async (event, ctx) => {
|
||||
try {
|
||||
refreshLiveConfigFromRuntime();
|
||||
const resolvedAgentId = resolveStatusUpdateAgentId(ctx);
|
||||
const resolvedSessionKey =
|
||||
ctx.sessionKey?.trim() ||
|
||||
(resolvedAgentId
|
||||
? resolveCanonicalSessionKeyFromSessionId({
|
||||
api,
|
||||
agentId: resolvedAgentId,
|
||||
sessionId: ctx.sessionId,
|
||||
})
|
||||
: undefined);
|
||||
const effectiveAgentId =
|
||||
resolvedAgentId || resolveStatusUpdateAgentId({ sessionKey: resolvedSessionKey });
|
||||
if (await isSessionActiveMemoryDisabled({ api, sessionKey: resolvedSessionKey })) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (!isEnabledForAgent(config, effectiveAgentId)) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (!isEligibleInteractiveSession(ctx)) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
!isAllowedChatType(config, {
|
||||
...ctx,
|
||||
sessionKey: resolvedSessionKey ?? ctx.sessionKey,
|
||||
mainKey: api.config.session?.mainKey,
|
||||
})
|
||||
) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
!isAllowedChatId(config, {
|
||||
sessionKey: resolvedSessionKey ?? ctx.sessionKey,
|
||||
messageProvider: ctx.messageProvider,
|
||||
})
|
||||
) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
const query = buildQuery({
|
||||
latestUserMessage: event.prompt,
|
||||
recentTurns: extractRecentTurns(event.messages),
|
||||
config,
|
||||
});
|
||||
const result = await maybeResolveActiveRecall({
|
||||
api,
|
||||
config,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (!isEnabledForAgent(config, effectiveAgentId)) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (!isEligibleInteractiveSession(ctx)) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
!isAllowedChatType(config, {
|
||||
...ctx,
|
||||
sessionKey: resolvedSessionKey ?? ctx.sessionKey,
|
||||
mainKey: api.config.session?.mainKey,
|
||||
})
|
||||
) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
!isAllowedChatId(config, {
|
||||
sessionKey: resolvedSessionKey ?? ctx.sessionKey,
|
||||
sessionId: ctx.sessionId,
|
||||
messageProvider: ctx.messageProvider,
|
||||
})
|
||||
) {
|
||||
await persistPluginStatusLines({
|
||||
api,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
channelId: ctx.channelId,
|
||||
query,
|
||||
currentModelProviderId: ctx.modelProviderId,
|
||||
currentModelId: ctx.modelId,
|
||||
});
|
||||
if (!result.summary) {
|
||||
return undefined;
|
||||
}
|
||||
const promptPrefix = buildPromptPrefix(result.summary);
|
||||
if (!promptPrefix) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
prependContext: promptPrefix,
|
||||
};
|
||||
} catch (error) {
|
||||
const message = toSingleLineLogValue(
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
api.logger.warn?.(
|
||||
`active-memory: before_prompt_build failed, skipping memory lookup: ${message}`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
const query = buildQuery({
|
||||
latestUserMessage: event.prompt,
|
||||
recentTurns: extractRecentTurns(event.messages),
|
||||
config,
|
||||
});
|
||||
const result = await maybeResolveActiveRecall({
|
||||
api,
|
||||
config,
|
||||
agentId: effectiveAgentId,
|
||||
sessionKey: resolvedSessionKey,
|
||||
sessionId: ctx.sessionId,
|
||||
messageProvider: ctx.messageProvider,
|
||||
channelId: ctx.channelId,
|
||||
query,
|
||||
currentModelProviderId: ctx.modelProviderId,
|
||||
currentModelId: ctx.modelId,
|
||||
});
|
||||
if (!result.summary) {
|
||||
return undefined;
|
||||
}
|
||||
const promptPrefix = buildPromptPrefix(result.summary);
|
||||
if (!promptPrefix) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
prependContext: promptPrefix,
|
||||
};
|
||||
} catch (error) {
|
||||
const message = toSingleLineLogValue(
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
api.logger.warn?.(
|
||||
`active-memory: before_prompt_build failed, skipping memory lookup: ${message}`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
});
|
||||
},
|
||||
{ timeoutMs: beforePromptBuildTimeoutMs },
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -2548,9 +2556,13 @@ export const __testing = {
|
||||
activeRecallCache.clear();
|
||||
lastActiveRecallCacheSweepAt = 0;
|
||||
minimumTimeoutMs = DEFAULT_MIN_TIMEOUT_MS;
|
||||
setupGraceTimeoutMs = DEFAULT_SETUP_GRACE_TIMEOUT_MS;
|
||||
},
|
||||
setMinimumTimeoutMsForTests(value: number) {
|
||||
minimumTimeoutMs = value;
|
||||
},
|
||||
setSetupGraceTimeoutMsForTests(value: number) {
|
||||
setupGraceTimeoutMs = Math.max(0, Math.floor(value));
|
||||
},
|
||||
setCachedResult,
|
||||
};
|
||||
|
||||
@@ -938,5 +938,6 @@ export type PluginHookRegistration<K extends PluginHookName = PluginHookName> =
|
||||
hookName: K;
|
||||
handler: PluginHookHandlerMap[K];
|
||||
priority?: number;
|
||||
timeoutMs?: number;
|
||||
source: string;
|
||||
};
|
||||
|
||||
@@ -86,4 +86,40 @@ describe("hook correlation fields", () => {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("honors per-hook registration timeouts over the default void hook timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const handler = vi.fn(
|
||||
async () =>
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, 20);
|
||||
}),
|
||||
);
|
||||
addTestHook({
|
||||
registry,
|
||||
pluginId: "plugin-a",
|
||||
hookName: "agent_end",
|
||||
handler: handler as PluginHookRegistration["handler"],
|
||||
timeoutMs: 30,
|
||||
});
|
||||
const logger = {
|
||||
error: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
};
|
||||
|
||||
const runner = createHookRunner(registry, {
|
||||
logger,
|
||||
voidHookTimeoutMsByHook: { agent_end: 5 },
|
||||
});
|
||||
const run = runner.runAgentEnd({ messages: [], success: true }, TEST_PLUGIN_AGENT_CTX);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(20);
|
||||
|
||||
await expect(run).resolves.toBeUndefined();
|
||||
expect(logger.error).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -46,6 +46,7 @@ function addBeforePromptBuildHook(
|
||||
ctx: PluginHookAgentContext,
|
||||
) => PluginHookBeforePromptBuildResult | Promise<PluginHookBeforePromptBuildResult>,
|
||||
priority?: number,
|
||||
timeoutMs?: number,
|
||||
) {
|
||||
addTestHook({
|
||||
registry,
|
||||
@@ -53,6 +54,7 @@ function addBeforePromptBuildHook(
|
||||
hookName: "before_prompt_build",
|
||||
handler: handler as PluginHookRegistration["handler"],
|
||||
priority,
|
||||
timeoutMs,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -264,6 +266,43 @@ describe("model override pipeline wiring", () => {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("honors per-hook registration timeouts over the default modifying hook timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
addBeforePromptBuildHook(
|
||||
registry,
|
||||
"active-memory",
|
||||
async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
return { prependContext: "memory context" };
|
||||
},
|
||||
10,
|
||||
30,
|
||||
);
|
||||
const logger = {
|
||||
error: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
info: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
};
|
||||
const runner = createHookRunner(registry, {
|
||||
logger,
|
||||
modifyingHookTimeoutMsByHook: { before_prompt_build: 5 },
|
||||
});
|
||||
|
||||
const resultPromise = runner.runBeforePromptBuild(
|
||||
{ prompt: "test", messages: [] },
|
||||
stubCtx,
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(20);
|
||||
|
||||
await expect(resultPromise).resolves.toEqual({ prependContext: "memory context" });
|
||||
expect(logger.error).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("graceful degradation + hook detection", () => {
|
||||
|
||||
@@ -78,12 +78,14 @@ export function addTestHook(params: {
|
||||
hookName: PluginHookRegistration["hookName"];
|
||||
handler: PluginHookRegistration["handler"];
|
||||
priority?: number;
|
||||
timeoutMs?: number;
|
||||
}) {
|
||||
params.registry.typedHooks.push({
|
||||
pluginId: params.pluginId,
|
||||
hookName: params.hookName,
|
||||
handler: params.handler,
|
||||
priority: params.priority ?? 0,
|
||||
...(params.timeoutMs !== undefined ? { timeoutMs: params.timeoutMs } : {}),
|
||||
source: "test",
|
||||
} as PluginHookRegistration);
|
||||
}
|
||||
@@ -95,6 +97,7 @@ export function addTestHooks(
|
||||
hookName: PluginHookRegistration["hookName"];
|
||||
handler: PluginHookRegistration["handler"];
|
||||
priority?: number;
|
||||
timeoutMs?: number;
|
||||
}>,
|
||||
) {
|
||||
for (const hook of hooks) {
|
||||
@@ -104,6 +107,7 @@ export function addTestHooks(
|
||||
hookName: hook.hookName,
|
||||
handler: hook.handler,
|
||||
...(hook.priority !== undefined ? { priority: hook.priority } : {}),
|
||||
...(hook.timeoutMs !== undefined ? { timeoutMs: hook.timeoutMs } : {}),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -397,21 +397,33 @@ export function createHookRunner(
|
||||
return typeof (value as { then?: unknown }).then === "function";
|
||||
};
|
||||
|
||||
const getVoidHookTimeoutMs = (hookName: PluginHookName): number | undefined => {
|
||||
const timeoutMs = voidHookTimeoutMsByHook[hookName];
|
||||
const normalizePositiveTimeoutMs = (timeoutMs: number | undefined): number | undefined => {
|
||||
if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.floor(timeoutMs);
|
||||
};
|
||||
|
||||
const getModifyingHookTimeoutMs = (hookName: PluginHookName): number | undefined => {
|
||||
const timeoutMs = modifyingHookTimeoutMsByHook[hookName];
|
||||
if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.floor(timeoutMs);
|
||||
};
|
||||
const getVoidHookTimeoutMs = (
|
||||
hookName: PluginHookName,
|
||||
hook: PluginHookRegistration,
|
||||
): number | undefined =>
|
||||
normalizePositiveTimeoutMs(hook.timeoutMs) ??
|
||||
normalizePositiveTimeoutMs(voidHookTimeoutMsByHook[hookName]);
|
||||
|
||||
const getModifyingHookTimeoutMs = (
|
||||
hookName: PluginHookName,
|
||||
hook: PluginHookRegistration,
|
||||
): number | undefined =>
|
||||
normalizePositiveTimeoutMs(hook.timeoutMs) ??
|
||||
normalizePositiveTimeoutMs(modifyingHookTimeoutMsByHook[hookName]);
|
||||
|
||||
const getClaimingHookTimeoutMs = (
|
||||
hookName: PluginHookName,
|
||||
hook: PluginHookRegistration,
|
||||
): number | undefined =>
|
||||
normalizePositiveTimeoutMs(hook.timeoutMs) ??
|
||||
normalizePositiveTimeoutMs(modifyingHookTimeoutMsByHook[hookName]);
|
||||
|
||||
const withHookTimeout = async <T>(
|
||||
promise: Promise<T>,
|
||||
@@ -467,7 +479,7 @@ export function createHookRunner(
|
||||
const promise = Promise.resolve(
|
||||
(hook.handler as (event: unknown, ctx: unknown) => Promise<void> | void)(event, ctx),
|
||||
);
|
||||
const timeoutMs = getVoidHookTimeoutMs(hookName);
|
||||
const timeoutMs = getVoidHookTimeoutMs(hookName, hook);
|
||||
if (timeoutMs) {
|
||||
await withHookTimeout(promise, timeoutMs, { unref: true });
|
||||
} else {
|
||||
@@ -504,7 +516,7 @@ export function createHookRunner(
|
||||
try {
|
||||
const handler = hook.handler as (event: unknown, ctx: unknown) => Promise<TResult>;
|
||||
const promise = Promise.resolve(handler(event, ctx));
|
||||
const timeoutMs = getModifyingHookTimeoutMs(hookName);
|
||||
const timeoutMs = getModifyingHookTimeoutMs(hookName, hook);
|
||||
const handlerResult = timeoutMs ? await withHookTimeout(promise, timeoutMs) : await promise;
|
||||
|
||||
if (handlerResult !== undefined && handlerResult !== null) {
|
||||
@@ -581,9 +593,11 @@ export function createHookRunner(
|
||||
): Promise<TResult | undefined> {
|
||||
for (const hook of hooks) {
|
||||
try {
|
||||
const handlerResult = await (
|
||||
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>
|
||||
)(event, ctx);
|
||||
const promise = Promise.resolve(
|
||||
(hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>)(event, ctx),
|
||||
);
|
||||
const timeoutMs = getClaimingHookTimeoutMs(hookName, hook);
|
||||
const handlerResult = timeoutMs ? await withHookTimeout(promise, timeoutMs) : await promise;
|
||||
if (handlerResult?.handled) {
|
||||
return handlerResult;
|
||||
}
|
||||
@@ -630,9 +644,11 @@ export function createHookRunner(
|
||||
let firstError: string | null = null;
|
||||
for (const hook of hooks) {
|
||||
try {
|
||||
const handlerResult = await (
|
||||
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>
|
||||
)(event, ctx);
|
||||
const promise = Promise.resolve(
|
||||
(hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>)(event, ctx),
|
||||
);
|
||||
const timeoutMs = getClaimingHookTimeoutMs(hookName, hook);
|
||||
const handlerResult = timeoutMs ? await withHookTimeout(promise, timeoutMs) : await promise;
|
||||
if (handlerResult?.handled) {
|
||||
return { status: "handled", result: handlerResult };
|
||||
}
|
||||
|
||||
@@ -1822,7 +1822,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
record: PluginRecord,
|
||||
hookName: K,
|
||||
handler: PluginHookHandlerMap[K],
|
||||
opts?: { priority?: number },
|
||||
opts?: { priority?: number; timeoutMs?: number },
|
||||
policy?: PluginTypedHookPolicy,
|
||||
) => {
|
||||
if (!isPluginHookName(hookName)) {
|
||||
@@ -1884,6 +1884,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
hookName,
|
||||
handler: effectiveHandler,
|
||||
priority: opts?.priority,
|
||||
...(opts?.timeoutMs !== undefined ? { timeoutMs: opts.timeoutMs } : {}),
|
||||
source: record.source,
|
||||
} as TypedPluginHookRegistration);
|
||||
};
|
||||
|
||||
@@ -2412,7 +2412,7 @@ export type OpenClawPluginApi = {
|
||||
on: <K extends PluginHookName>(
|
||||
hookName: K,
|
||||
handler: PluginHookHandlerMap[K],
|
||||
opts?: { priority?: number },
|
||||
opts?: { priority?: number; timeoutMs?: number },
|
||||
) => void;
|
||||
};
|
||||
|
||||
|
||||
@@ -170,4 +170,36 @@ describe("inbound_claim hook runner", () => {
|
||||
|
||||
expect(result).toEqual({ status: "error", error: "boom" });
|
||||
});
|
||||
|
||||
it("reports targeted per-hook registration timeouts as handler errors", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const logger = {
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
const slow = vi.fn(() => new Promise(() => {}));
|
||||
const { registry, runner } = createHookRunnerWithRegistry(
|
||||
[{ hookName: "inbound_claim", handler: slow }],
|
||||
{ logger },
|
||||
);
|
||||
registry.typedHooks[0].timeoutMs = 5;
|
||||
|
||||
const run = runner.runInboundClaimForPluginOutcome(
|
||||
"test-plugin",
|
||||
inboundClaimEvent,
|
||||
inboundClaimCtx,
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(5);
|
||||
|
||||
await expect(run).resolves.toEqual({ status: "error", error: "timed out after 5ms" });
|
||||
expect(logger.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining(
|
||||
"inbound_claim handler from test-plugin failed: timed out after 5ms",
|
||||
),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -85,4 +85,45 @@ describe("reply_dispatch hook runner", () => {
|
||||
);
|
||||
expect(succeeding).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("honors per-hook registration timeouts and continues to the next handler", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const logger = {
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
const slow = vi.fn(() => new Promise(() => {}));
|
||||
const succeeding = vi.fn().mockResolvedValue({
|
||||
handled: true,
|
||||
queuedFinal: false,
|
||||
counts: { tool: 1, block: 0, final: 0 },
|
||||
});
|
||||
const { registry, runner } = createHookRunnerWithRegistry(
|
||||
[
|
||||
{ hookName: "reply_dispatch", handler: slow },
|
||||
{ hookName: "reply_dispatch", handler: succeeding },
|
||||
],
|
||||
{ logger },
|
||||
);
|
||||
registry.typedHooks[0].timeoutMs = 5;
|
||||
|
||||
const run = runner.runReplyDispatch(replyDispatchEvent, replyDispatchCtx);
|
||||
await vi.advanceTimersByTimeAsync(5);
|
||||
|
||||
await expect(run).resolves.toEqual({
|
||||
handled: true,
|
||||
queuedFinal: false,
|
||||
counts: { tool: 1, block: 0, final: 0 },
|
||||
});
|
||||
expect(logger.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining(
|
||||
"reply_dispatch handler from test-plugin failed: timed out after 5ms",
|
||||
),
|
||||
);
|
||||
expect(succeeding).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user