系列文章索引
J-LangChain 入门
介绍
j-langchain是一个Java版的LangChain开发框架,具有灵活编排和流式执行能力,旨在简化和加速各类大模型应用在Java平台的落地开发。它提供了一组实用的工具和类,使得开发人员能够更轻松地构建类似于LangChain的Java应用程序。
github: https://github.com/flower-trees/j-langchain
复杂智能链流式执行实例
1、分支路由
根据 chain
输入参数 vendor
,判断使用 llama3
、gpt-4
、还是回复 无法回答
。
LangChain实现
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_ollama import OllamaLLM
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("tell me a joke about ${topic}")
def route(info):
if "ollama" in info["vendor"]:
return prompt | OllamaLLM(model="llama3:8b")
elif "chatgpt" in info["vendor"]:
return prompt | ChatOpenAI(model="gpt-4")
else:
return prompt | RunnableLambda(lambda x: "sorry, I don't know how to do that")
chain = route | StrOutputParser()
result = chain.stream({"topic": "bears", "vendor": "ollama"})
for chunk in result:
print(chunk, end="", flush=False)
J-LangChain实现
使用 flow组件
原生功能 .next()
实现分支执行 子chain
。
FlowInstance chain = chainActor.builder()
......
.next(
Info.c("vendor == 'ollama'", chatOllama),
Info.c("vendor == 'chatgpt'", chatOpenAI),
Info.c(input -> "sorry, I don't know how to do that")
)
......
public void SwitchDemo() {
BaseRunnable<StringPromptValue, ?> prompt = PromptTemplate.fromTemplate("tell me a joke about ${topic}");
ChatOllama chatOllama = ChatOllama.builder().model("llama3:8b").build();
ChatOpenAI chatOpenAI = ChatOpenAI.builder().model("gpt-4").build();
FlowInstance chain = chainActor.builder()
.next(prompt)
.next(
Info.c("vendor == 'ollama'", chatOllama),
Info.c("vendor == 'chatgpt'", chatOpenAI),
Info.c(input -> "sorry, I don't know how to do that")
)
.next(new StrOutputParser()).build();
ChatGenerationChunk chunk = chainActor.stream(chain, Map.of("topic", "bears", "vendor", "ollama"));
StringBuilder sb = new StringBuilder();
while (chunk.getIterator().hasNext()) {
sb.append(chunk.getIterator().next());
System.out.println(sb);
}
}
2、组合嵌套
主chain
调用 子chain
生成一个笑话,并对笑话是否可笑进行评价。
LangChain实现
from langchain_ollama import OllamaLLM
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = OllamaLLM(model="llama3:8b")
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model | StrOutputParser()
analysis_prompt = ChatPromptTemplate.from_template("is this a funny joke? {joke}")
composed_chain = {"joke": chain} | analysis_prompt | model | StrOutputParser()
result = composed_chain.stream({"topic": "bears"})
for chunk in result:
print(chunk, end="", flush=False)
J-LangChain实现
flow组件
原生功能支持嵌套执行。
💡 Notes:
- 这里需要
.next(new InvokeChain(chain))
来一次性调用嵌套链,返回结果。
public void ComposeDemo() throws TimeoutException {
ChatOllama llm = ChatOllama.builder().model("llama3:8b").build();
StrOutputParser parser = new StrOutputParser();
BaseRunnable<StringPromptValue, ?> prompt = PromptTemplate.fromTemplate("tell me a joke about ${topic}");
FlowInstance chain = chainActor.builder().next(prompt).next(llm).next(parser).build();
BaseRunnable<StringPromptValue, ?> analysisPrompt = PromptTemplate.fromTemplate("is this a funny joke? ${joke}");
FlowInstance analysisChain = chainActor.builder()
.next(new InvokeChain(chain)) //invoke 执行嵌套链
.next(input -> { System.out.printf("joke content: '%s' \n\n", input); return input; })
.next(input -> Map.of("joke", ((Generation)input).getText()))
.next(analysisPrompt)
.next(llm)
.next(parser).build();
ChatGenerationChunk chunk = chainActor.stream(analysisChain, Map.of("topic", "bears"));
StringBuilder sb = new StringBuilder();
while (chunk.getIterator().hasNext()) {
sb.append(chunk.getIterator().next());
System.out.println(sb);
}
}
3、并行执行
主chain
并行执行 joke_chain
和 poem_chain
,并交替输出stream答案。
LangChain实现
from langchain_core.runnables import RunnableParallel
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
parallel_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
result = parallel_chain.stream({"topic": "bear"})
joke = "joke: "
poem = "poem: "
for chunk in result:
if 'joke' in chunk:
joke += chunk['joke']
print(joke, flush=True)
if 'poem' in chunk:
poem += chunk['poem']
print(poem, flush=True)
输出:
joke: Why
joke: Why did
joke: Why did the
poem: Bear
joke: Why did the bear
poem: Bear stands
joke: Why did the bear break
poem: Bear stands tall
joke: Why did the bear break up
poem: Bear stands tall,
joke: Why did the bear break up with
poem: Bear stands tall, wise
joke: Why did the bear break up with his
poem: Bear stands tall, wise and
......
J-LangChain实现
使用 flow组件
原生功能 .concurrent()
实现并发执行。
FlowInstance chain = chainActor.builder()
.concurrent(
(IResult<Map<String, AIMessageChunk>>) (iContextBus, isTimeout) ->
Map.of("joke", iContextBus.getResult(jokeChain.getFlowId()), "poem", iContextBus.getResult(poemChain.getFlowId())),
jokeChain, poemChain
).build();
💡 Notes:
- 这里
ChatOllama
并不是线程安全的,并发时需要new
新实例。
public void ParallelDemo() {
BaseRunnable<StringPromptValue, ?> joke = PromptTemplate.fromTemplate("tell me a joke about ${topic}");
BaseRunnable<StringPromptValue, ?> poem = PromptTemplate.fromTemplate("write a 2-line poem about ${topic}");
FlowInstance jokeChain = chainActor.builder().next(joke).next(ChatOllama.builder().model("llama3:8b").build()).build();
FlowInstance poemChain = chainActor.builder().next(poem).next(ChatOllama.builder().model("llama3:8b").build()).build();
FlowInstance chain = chainActor.builder().concurrent(
(IResult<Map<String, AIMessageChunk>>) (iContextBus, isTimeout) ->
Map.of("joke", iContextBus.getResult(jokeChain.getFlowId()), "poem", iContextBus.getResult(poemChain.getFlowId())),
jokeChain, poemChain
).build();
Map<String, AIMessageChunk> result = chainActor.stream(chain, Map.of("topic", "bears"));
CompletableFuture.runAsync(() -> {
AIMessageChunk jokeChunk = result.get("joke");
StringBuilder jokeSb = new StringBuilder().append("joke: ");
while (true) {
try {
if (!jokeChunk.getIterator().hasNext()) break;
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
jokeSb.append(jokeChunk.getIterator().next().getContent());
System.out.println(jokeSb);
}
});
CompletableFuture.runAsync(() -> {
AIMessageChunk poemChunk = result.get("poem");
StringBuilder poemSb = new StringBuilder().append("poem: ");
while (true) {
try {
if (!poemChunk.getIterator().hasNext()) break;
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
poemSb.append(poemChunk.getIterator().next().getContent());
System.out.println(poemSb);
}
}).join();
}
4、动态路由
chain 1
总结用户问题 topic
,主chain
根据 topic
动态路由执行 langchain_chain
、anthropic_chain
、或者 general_chain
。
LangChain实现
通过 RunnableLambda
实现动态路由:
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
chain = (
PromptTemplate.from_template(
"""Given the user question below, classify it as either being about `LangChain`, `Anthropic`, or `Other`.
Do not respond with more than one word.
<question>
{question}
</question>
Classification:"""
)
| OllamaLLM(model="llama3:8b")
| StrOutputParser()
)
langchain_chain = PromptTemplate.from_template(
"""You are an expert in langchain. \
Always answer questions starting with "As Harrison Chase told me". \
Respond to the following question:
Question: {question}
Answer:"""
) | OllamaLLM(model="llama3:8b")
anthropic_chain = PromptTemplate.from_template(
"""You are an expert in anthropic. \
Always answer questions starting with "As Dario Amodei told me". \
Respond to the following question:
Question: {question}
Answer:"""
) | OllamaLLM(model="llama3:8b")
general_chain = PromptTemplate.from_template(
"""Respond to the following question:
Question: {question}
Answer:"""
) | OllamaLLM(model="llama3:8b")
def route(info):
if "anthropic" in info["topic"].lower():
return anthropic_chain
elif "langchain" in info["topic"].lower():
return langchain_chain
else:
return general_chain
full_chain = {"topic": chain, "question": lambda x: x["question"]} | RunnableLambda(route)
result = full_chain.stream({"question": "how do I use LangChain?"})
for chunk in result:
print(chunk, end="", flush=False)
J-LangChain实现
使用 flow组件
原生的功能 .next()
实现动态路由:
FlowInstance fullChain = chainActor.builder()
......
.next(
Info.c("topic == 'anthropic'", anthropicChain),
Info.c("topic == 'langchain'", langchainChain),
Info.c(generalChain)
)
......
public void RouteDemo() throws TimeoutException {
ChatOllama llm = ChatOllama.builder().model("llama3:8b").build();
BaseRunnable<StringPromptValue, Object> prompt = PromptTemplate.fromTemplate(
"""
Given the user question below, classify it as either being about `LangChain`, `Anthropic`, or `Other`.
Do not respond with more than one word.
<question>
${question}
</question>
Classification:
"""
);
FlowInstance chain = chainActor.builder().next(prompt).next(llm).next(new StrOutputParser()).build();
FlowInstance langchainChain = chainActor.builder().next(PromptTemplate.fromTemplate(
"""
You are an expert in langchain. \
Always answer questions starting with "As Harrison Chase told me". \
Respond to the following question:
Question: ${question}
Answer:
"""
)).next(ChatOllama.builder().model("llama3:8b").build()).build();
FlowInstance anthropicChain = chainActor.builder().next(PromptTemplate.fromTemplate(
"""
You are an expert in anthropic. \
Always answer questions starting with "As Dario Amodei told me". \
Respond to the following question:
Question: ${question}
Answer:
"""
)).next(ChatOllama.builder().model("llama3:8b").build()).build();
FlowInstance generalChain = chainActor.builder().next(PromptTemplate.fromTemplate(
"""
Respond to the following question:
Question: ${question}
Answer:
"""
)).next(ChatOllama.builder().model("llama3:8b").build()).build();
FlowInstance fullChain = chainActor.builder()
.next(new InvokeChain(chain)) //invoke 执行嵌套链
.next(input -> { System.out.printf("topic: '%s' \n\n", input); return input; })
.next(input -> Map.of("prompt", input, "question", ((Map<?, ?>)ContextBus.get().getFlowParam()).get("question")))
.next(input -> { System.out.printf("topic: '%s' \n\n", input); return input; })
.next(
Info.c("topic == 'anthropic'", anthropicChain),
Info.c("topic == 'langchain'", langchainChain),
Info.c(generalChain)
).build();
AIMessageChunk chunk = chainActor.stream(fullChain, Map.of("question", "how do I use Anthropic?"));
StringBuilder sb = new StringBuilder();
while (chunk.getIterator().hasNext()) {
sb.append(chunk.getIterator().next().getContent());
System.out.println(sb);
}
}
动态构建
主chain
调用 子chain 1
获取加工后的用户问题,子chain 1
根据用户输入问题是否带 对话历史
,判断是调用子chain 2
根据历史修改用户问题,还是直接透传用户问题,主chain
根据最终问题,并添加 system
内容后,给出答案。
LangChain实现
from langchain_core.runnables import chain, RunnablePassthrough
llm = OllamaLLM(model="llama3:8b")
contextualize_instructions = """Convert the latest user question into a standalone question given the chat history. Don't answer the question, return the question and nothing else (no descriptive text)."""
contextualize_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_instructions),
("placeholder", "{chat_history}"),
("human", "{question}"),
]
)
contextualize_question = contextualize_prompt | llm | StrOutputParser()
@chain
def contextualize_if_needed(input_: dict):
if input_.get("chat_history"):
return contextualize_question
else:
return RunnablePassthrough() | itemgetter("question")
@chain
def fake_retriever(input_: dict):
return "egypt's population in 2024 is about 111 million"
qa_instructions = (
"""Answer the user question given the following context:\n\n{context}."""
)
qa_prompt = ChatPromptTemplate.from_messages(
[("system", qa_instructions), ("human", "{question}")]
)
full_chain = (
RunnablePassthrough.assign(question=contextualize_if_needed).assign(
context=fake_retriever
)
| qa_prompt
| llm
| StrOutputParser()
)
result = full_chain.stream({
"question": "what about egypt",
"chat_history": [
("human", "what's the population of indonesia"),
("ai", "about 276 million"),
],
})
for chunk in result:
print(chunk, end="", flush=False)
J-LangChain实现
使用 flow组件
原生功能 .all()
实现执行 子chain
获取问题,并结合 system内容
给出答案的功能。
FlowInstance fullChain = chainActor.builder()
.all(
(iContextBus, isTimeout) -> Map.of(
"question", iContextBus.getResult(contextualizeIfNeeded.getFlowId()).toString(),
"context", iContextBus.getResult("fakeRetriever")),
Info.c(contextualizeIfNeeded),
Info.c(input -> "egypt's population in 2024 is about 111 million").cAlias("fakeRetriever")
)
......
public void DynamicDemo() throws TimeoutException {
ChatOllama llm = ChatOllama.builder().model("llama3:8b").build();
String contextualizeInstructions = """
Convert the latest user question into a standalone question given the chat history. Don't answer the question, return the question and nothing else (no descriptive text).""";
BaseRunnable<ChatPromptValue, Object> contextualizePrompt = ChatPromptTemplate.fromMessages(
List.of(
Pair.of("system", contextualizeInstructions),
Pair.of("placeholder", "${chatHistory}"),
Pair.of("human", "${question}")
)
);
FlowInstance contextualizeQuestion = chainActor.builder()
.next(contextualizePrompt)
.next(llm)
.next(new StrOutputParser())
.build();
FlowInstance contextualizeIfNeeded = chainActor.builder().next(
Info.c("chatHistory != null", new InvokeChain(contextualizeQuestion)),
Info.c(input -> Map.of("question", ((Map<String, String>)input).get("question")))
).build();
String qaInstructions =
"""
Answer the user question given the following context:\n\n${context}.
""";
BaseRunnable<ChatPromptValue, Object> qaPrompt = ChatPromptTemplate.fromMessages(
List.of(
Pair.of("system", qaInstructions),
Pair.of("human", "${question}")
)
);
FlowInstance fullChain = chainActor.builder()
.all(
(iContextBus, isTimeout) -> Map.of(
"question", iContextBus.getResult(contextualizeIfNeeded.getFlowId()).toString(),
"context", iContextBus.getResult("fakeRetriever")),
Info.c(contextualizeIfNeeded),
Info.c(input -> "egypt's population in 2024 is about 111 million").cAlias("fakeRetriever")
)
.next(qaPrompt)
.next(input -> { System.out.printf("topic: '%s' \n\n", JsonUtil.toJson(input)); return input; })
.next(llm)
.next(new StrOutputParser())
.build();
ChatGenerationChunk chunk = chainActor.stream(fullChain,
Map.of(
"question", "what about egypt",
"chatHistory",
List.of(
Pair.of("human", "what's the population of indonesia"),
Pair.of("ai", "about 276 million")
)
)
);
StringBuilder sb = new StringBuilder();
while (chunk.getIterator().hasNext()) {
sb.append(chunk.getIterator().next().getText());
System.out.println(sb);
}
}