使用 Workers、函式與 Cron 觸發器搭配 iii 打造文件智慧後端
重點摘要
本篇教學將建立一套文件智慧工作流程,使用 iii 引擎與 Python SDK,先以背景程序啟動引擎,再連接 Python Worker。完成設定後,分別註冊文字正規化、斷詞、情感分析、關鍵字提取、報表產出與心跳監控等函式,並將它們組合成單一分析管線。接著透過直接呼叫、HTTP 端點、發後不理(fire-and-forget)以及排程 Cron 觸發器來執行相同邏輯。過程中同時追蹤基本的執行狀態,使此工作流程更貼近真實後端系統,而非靜態筆記本展示。完整程式碼請見內文。
In this tutorial, we build a document-intelligence workflow with iii. We begin by installing the iii engine and Python SDK, then start the engine as a background process and connect a Python worker to it. After the setup, we register separate functions for text normalization, tokenization, sentiment analysis, keyword extraction, reporting, and heartbeat tracking. We then combine these functions into a single analysis pipeline and run the same logic via direct invocation, an HTTP endpoint, fire-and-forget execution, and a scheduled cron trigger. Along the way, we also track basic runtime state, making the workflow feel closer to a real backend system than a static notebook demo. Check out the FULL CODES here. Copy CodeCopiedUse a different Browserimport os, sys, subprocess, time, socket, json, threading from collections import Counter HOME = os.path.expanduser("~") BIN_DIR = f"{HOME}/.local/bin" os.environ["PATH"] = BIN_DIR + os.pathsep + os.environ.get("PATH", "") def sh(cmd): print(f"$ {cmd}") subprocess.run(cmd, shell=True, check=True) if not os.path.exists(f"{BIN_DIR}/iii"): sh(f"curl -fsSL https://install.iii.dev/iii/main/install.sh | BIN_DIR={BIN_DIR} sh") sh(f"{sys.executable} -m pip install -q iii-sdk requests") III = f"{BIN_DIR}/iii" sh(f"{III} --version") We start by importing the required Python modules and setting up the local binary path for the III engine. We define a small helper function to run shell commands and install the III engine if it is not already available. We also install the Python SDK and requests package, then verify the iii installation by checking its version. Copy CodeCopiedUse a different BrowserWS_URL, HTTP_URL = "ws://localhost:49134", "http://localhost:3111" engine_log = open("/tmp/iii-engine.log", "w") engine = subprocess.Popen([III, "--use-default-config"], stdout=engine_log, stderr=subprocess.STDOUT) def wait_port(host, port, timeout=90): end = time.time() + timeout while time.time() < end: with socket.socket() as s: s.settimeout(1) try: s.connect((host, port)); return True except OSError: time.sleep(0.5) return False assert wait_port("localhost", 49134), "engine never came up — see /tmp/iii-engine.log" print(f"✓ engine up — WS {WS_URL} | HTTP {HTTP_URL}") from iii import register_worker try: from iii import TriggerAction except Exception: TriggerAction = None worker = register_worker(WS_URL) _STATE = {"docs_analyzed": 0, "heartbeats": 0, "keyword_totals": Counter()} _LOCK = threading.Lock() POSITIVE = {"good","great","love","excellent","happy","fast","reliable","amazing","best","win"} NEGATIVE = {"bad","terrible","hate","slow","broken","sad","worst","bug","crash","fail"} We launch the iii engine as a background process and wait for its WebSocket port to become available. We then connect a Python worker to the running engine and prepare optional support for fire-and-forget triggers. We also define a shared in-memory state, a thread lock, and simple positive and negative word sets for sentiment analysis. Copy CodeCopiedUse a different Browserdef normalize(data): return {"text": (data.get("text") or "").strip().lower()} def tokenize(data): text = data.get("text", "") cleaned = "".join(c if (c.isalnum() or c.isspace()) else " " for c in text) tokens = [t for t in cleaned.split() if t] return {"tokens": tokens, "count": len(tokens)} def sentiment(data): toks = data.get("tokens", []) pos = sum(t in POSITIVE for t in toks) neg = sum(t in NEGATIVE for t in toks) score = pos - neg label = "positive" if score > 0 else "negative" if score < 0 else "neutral" return {"label": label, "score": score, "pos": pos, "neg": neg} def keywords(data): toks = data.get("tokens", []) stop = {"the","a","an","is","it","to","of","and","in","for","on","how"} freq = Counter(t for t in toks if t not in stop and len(t) > 2) return {"keywords": freq.most_common(data.get("top_n", 5))} def analyze(data): norm = worker.trigger({"function_id": "text::normalize", "payload": {"text": data.get("text","")}}) toks = worker.trigger({"function_id": "text::tokenize", "payload": norm}) sent = worker.trigger({"function_id": "text::sentiment", "payload": toks}) keys = worker.trigger({"function_id": "text::keywords", "payload": {**toks, "top_n": data.get("top_n", 5)}}) with _LOCK: _STATE["docs_analyzed"] += 1 for k, c in keys["keywords"]: _STATE["keyword_totals"][k] += c n = _STATE["docs_analyzed"] return {"tokens": toks["count"], "sentiment": sent, "keywords": keys["keywords"], "docs_analyzed": n} def report(data): with _LOCK: return {"docs_analyzed": _STATE["docs_analyzed"], "heartbeats": _STATE["heartbeats"], "top_keywords_all_docs": _STATE["keyword_totals"].most_common(5)} def http_analyze(data): body = data.get("body") or {} result = worker.trigger({"function_id": "pipeline::analyze", "payload": body}) return {"status_code": 200, "body": result, "headers": {"Content-Type": "application/json"}} def heartbeat(data): with _LOCK: _STATE["heartbeats"] += 1 return {"ok": True} for fid, fn in [ ("text::normalize", normalize), ("text::tokenize", tokenize), ("text::sentiment", sentiment), ("text::keywords", keywords), ("pipeline::analyze", analyze), ("stats::report", report), ("http::analyze", http_analyze), ("cron::heartbeat", heartbeat), ]: worker.register_function(fid, fn) We define the core functions used in the text-analysis workflow, including normalization, tokenization, sentiment detection, and keyword extraction. We then create an analysis function that routes each step through the III engine instead of calling everything directly. We also add reporting, HTTP handling, and heartbeat functions before registering all of them with the worker. Copy CodeCopiedUse a different Browserworker.register_trigger({"type": "http", "function_id": "http::analyze", "config": {"api_path": "/analyze", "http_method": "POST"}}) cron_ok = False try: worker.register_trigger({"type": "cron", "function_id": "cron::heartbeat", "config": {"schedule": "*/2 * * * * *"}}) cron_ok = True except Exception as e: print("cron trigger skipped:", e) try: worker.connect() except Exception: pass time.sleep(2) We register an HTTP trigger so that the analysis pipeline can be invoked via a POST request. We also try to register a cron trigger that runs the heartbeat function on a fixed schedule, while safely skipping it if the engine build does not support that schema. We then connect the worker and pause briefly so the registered functions and triggers are ready to use. Copy CodeCopiedUse a different Browserprint("\n=== A) Direct invocation — orchestrated through the engine ===") docs = [ "iii makes the backend amazing and fast, I love how reliable it is", "The legacy gateway was slow and broken, a terrible buggy experience", "Workers register functions and triggers; the engine routes every call", ] for d in docs: r = worker.trigger({"function_id": "pipeline::analyze", "payload": {"text": d, "top_n": 4}}) print(f" [{r['sentiment']['label']:>8}] tokens={r['tokens']:>2} keywords={r['keywords']}") print("\n=== B) The SAME function over HTTP (:3111) — zero handler changes ===") import requests try: resp = requests.post(f"{HTTP_URL}/analyze", json={"text": "great great product, best ever", "top_n": 3}, timeout=10) print(" HTTP", resp.status_code, "->", resp.json()) except Exception as e: print(" HTTP call failed (engine HTTP module/version?):", e) print("\n=== C) Fire-and-forget invocation ===") if TriggerAction: worker.trigger({"function_id": "pipeline::analyze", "payload": {"text": "async win, no waiting"}, "action": TriggerAction.Void()}) print(" dispatched (no result awaited)") else: print(" TriggerAction not in this SDK build — skipping") print("\n=== D) Cron trigger firing on its own ===") if cron_ok: time.sleep(5) print(" heartbeats so far:", worker.trigger({"function_id": "stats::report", "payload": {}})["heartbeats"]) else: print(" cron not registered on this engine build") print("\n=== E) Aggregate state report ===") print(json.dumps(worker.trigger({"function_id": "stats::report", "payload
Related
相關文章

階躍Step 3.7 Flash拿下AA榜第一,讓Agent從「跑Demo」到「能搞錢」
這篇消息聚焦「階躍Step 3.7 Flash拿下AA榜第一,讓Agent從「跑Demo」到「能搞錢」」。原始導語提到:主攻極致速度與高性價比。 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。

Exa獲2.5億美元融資,打造Agent原生的“Google”
這篇消息聚焦「Exa獲2.5億美元融資,打造Agent原生的“Google”」。原始導語提到:AI時代需要把搜索從底層重新做一遍 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。

封了自家元寶,微信AI親自下場
這篇消息聚焦「封了自家元寶,微信AI親自下場」。原始導語提到:聊天框裡,如何再裝下一個AI操作系統。 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。
又一百億估值獨角獸誕生!AI軟件監控創企拿下新融資,去年ARR破6億
智東西 編譯 | 田忠婷 編輯 | 程茜 智東西6月4日報道,昨晚,以色列AI軟件監控獨角獸Coralogix完成2億美元(約合人民幣13.5億元)F輪融資,投後估值達16億美元(約合人民幣108億元)。 該輪融資金額將主要用於AI智能體能力研發、遙測數據技術設施建設和市場擴張三個領域。過去一年,Coralogix的營收增長超過60%,並且其年化收入在一年多前就突破1億美元,在全球擁有包括IBM、Tradeweb和JFrog在內的5000多家客戶。 Coralogix本輪融資由Advent、加拿大養老金計劃投資委員會(CPPIB)和Greenfield共同領投,Brighton Park Capital跟投。 該公司2025年6月17日完成1.15億美元(約合人民幣7.8億元)E輪融資,投後估值超10億美元(約合人民幣68億元),一舉躍升獨角獸企業。距離上輪融資不到1年,Coralogix就完成了新一輪融資,這也是其成立以來最大的單筆融資。目前,Coralogix累計融資金額已達5.5億美元(約合人民幣37億元) ▲Coralogix獲得2億美元融資的公告(圖源:Coralogix) Coralogix由Ariel Assaraf於2014年在以色列創立,總部位於美國波士頓,是一家專注於AI時代軟件系統監控的公司。其核心業務是為企業提供新一代的運維監控系統,以AI Agent替代傳統的監控軟件,從而幫助企業在AI時代實現更智能、更自主的系統運維。 其創始人Ariel Assaraf畢業於以色列開放大學經濟學與數學專業,後獲神經科學與機器學習碩士學位。他曾在以色列安全部門工作,後在Verint等公司任職,於2014年聯合創立Coralogix並擔任CEO。 一、AI Agent倒逼運維變革,傳統監控軟件顯露短板 傳統監控軟件主要依賴儀表盤、固定告警規則和人工分析,已經難以應對
EVA-Bench Data 2.0: 3 Domains, 121 Tools, 213 Scenarios
Back to Articles EVA-Bench Data 2.0: 3 Domains, 121 Tools, 213 Scenarios Enterprise Article Published June 4, 2026 Upvote 1 Tara Bogavelli tarabogavelli Follow ServiceNow-AI Gabrielle Gauthier Melancon gabegma Follow ServiceNow-AI Katrina Stankiewicz kstankiewicz Follow ServiceNow-AI Nifemi Bamgbose onifemibam Follow ServiceNow-AI Fanny Riols FannyRiols Follow ServiceNow-AI Hoang Nguyen hnguy7 Follow ServiceNow-AI Raghav Mehndiratta rmehndir Follow ServiceNow-AI Lindsay Brin lindsaybrin Follow ServiceNow-AI Hari Subramani Hari-sub Follow ServiceNow-AI Anil Madamala anilmadamala Follow ServiceNow-AI Introduction Voice agent failures are often highly domain-specific. A system that flawlessly processes alphanumeric confirmation codes in flight re-booking transactions might stumble when handli

AI Agent 的門票,MiniMax 想先打下來
這篇消息聚焦「AI Agent 的門票,MiniMax 想先打下來」。原始導語提到:為何人人都在 token 焦慮? 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。