394 lines
16 KiB
Python
394 lines
16 KiB
Python
import random
|
|
import openai
|
|
import time
|
|
import json
|
|
import argparse
|
|
import tiktoken
|
|
from openai import OpenAI
|
|
from openai import OpenAIError
|
|
|
|
|
|
client = OpenAI(
|
|
api_key="sk-5f06261529bb44df86d9b2fdbae1a6b5", # 在这里将 MOONSHOT_API_KEY 替换为你从 Kimi 开放平台申请的 API Key
|
|
base_url="https://api.deepseek.com/v1",
|
|
)
|
|
|
|
def get_qa_response(model, question, answer):
|
|
message = [
|
|
{"role": "system", "content":"You are a huallucination detector. You MUST determine if the provided answer contains hallucination or not for the question based on the world knowledge. The answer you provided MUST be \"Yes\" or \"No\""},
|
|
{"role": "user", "content":
|
|
"\n\n#Question#: " + question +
|
|
"\n#Answer#: " + answer +
|
|
"\n#Your Judgement#: "}
|
|
]
|
|
prompt = "\n#Question#: " + question + "\n#Answer#: " + answer + "\n#Your Judgement#:"
|
|
while True:
|
|
try:
|
|
if model == "gpt-3.5-turbo":
|
|
res = openai.ChatCompletion.create(
|
|
model="gpt-3.5-turbo",
|
|
messages=message,
|
|
temperature=0.0,
|
|
)
|
|
response = res['choices'][0]['message']['content']
|
|
else:
|
|
res = client.chat.completions.create(
|
|
model="deepseek-chat",
|
|
messages=message,
|
|
temperature=1,
|
|
max_tokens=256,
|
|
top_p=1
|
|
)
|
|
response = res.choices[0].message.content
|
|
break
|
|
except OpenAIError:
|
|
print('openai.error.RateLimitError\nRetrying...')
|
|
time.sleep(60)
|
|
except openai.error.ServiceUnavailableError:
|
|
print('openai.error.ServiceUnavailableError\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.Timeout:
|
|
print('openai.error.Timeout\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.APIError:
|
|
print('openai.error.APIError\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.APIConnectionError:
|
|
print('openai.error.APIConnectionError\nRetrying...')
|
|
time.sleep(20)
|
|
|
|
return response
|
|
|
|
|
|
def get_dialogue_response(model, dialog, response, instruction):
|
|
message = [
|
|
{"role": "system", "content": "You are a response judge. You MUST determine if the provided response contains non-factual or hallucinated information. The answer you give MUST be \"Yes\" or \"No\""},
|
|
{"role": "user", "content": instruction +
|
|
"\n\n#Dialogue History#: " + dialog +
|
|
"\n#Response#: " + response +
|
|
"\n#Your Judgement#: "}
|
|
]
|
|
prompt = instruction + "\n\n#Dialogue History#: " + dialog + "\n#Response#: " + response + "\n#Your Judgement#:"
|
|
while True:
|
|
try:
|
|
if model == "gpt-3.5-turbo":
|
|
res = openai.ChatCompletion.create(
|
|
model="gpt-3.5-turbo",
|
|
messages=message,
|
|
temperature=0.0,
|
|
)
|
|
response = res['choices'][0]['message']['content']
|
|
else:
|
|
res = openai.Completion.create(
|
|
model=model,
|
|
prompt=prompt,
|
|
temperature=0.0
|
|
)
|
|
response = res["choices"][0]['text'].strip()
|
|
break
|
|
except openai.error.RateLimitError:
|
|
print('openai.error.RateLimitError\nRetrying...')
|
|
time.sleep(60)
|
|
except openai.error.ServiceUnavailableError:
|
|
print('openai.error.ServiceUnavailableError\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.Timeout:
|
|
print('openai.error.Timeout\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.APIError:
|
|
print('openai.error.APIError\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.APIConnectionError:
|
|
print('openai.error.APIConnectionError\nRetrying...')
|
|
time.sleep(20)
|
|
|
|
return response
|
|
|
|
|
|
def num_tokens_from_message(message, model="davinci"):
|
|
encoding = tiktoken.encoding_for_model(model)
|
|
num_tokens = len(encoding.encode(message))
|
|
return num_tokens
|
|
|
|
|
|
def truncate_message(prompt1, prompt2, model="davinci"):
|
|
if num_tokens_from_message(prompt1 + prompt2, model) > 2033:
|
|
truncation_length = 2033 - num_tokens_from_message(prompt2)
|
|
while num_tokens_from_message(prompt1) > truncation_length:
|
|
prompt1 = " ".join(prompt1.split()[:-1])
|
|
prompt = prompt1 + prompt2
|
|
return prompt
|
|
|
|
|
|
def get_summarization_response(model, document, summary, instruction):
|
|
message = [
|
|
{"role": "system", "content": "You are a summary judge. You MUST determine if the provided summary contains non-factual or hallucinated information. The answer you give MUST be \"Yes\" or \"No\""},
|
|
{"role": "user", "content": instruction +
|
|
"\n\n#Document#: " + document +
|
|
"\n#Summary#: " + summary +
|
|
"\n#Your Judgement#: "}
|
|
]
|
|
prompt1 = instruction + "\n\n#Document#: " + document
|
|
prompt2 = "\n#Summary#: " + summary + "\n#Your Judgement#:"
|
|
if model == "davinci":
|
|
prompt = truncate_message(prompt1, prompt2)
|
|
else:
|
|
prompt = prompt1 + prompt2
|
|
while True:
|
|
try:
|
|
if model == "gpt-3.5-turbo":
|
|
res = openai.ChatCompletion.create(
|
|
model="gpt-3.5-turbo",
|
|
messages=message,
|
|
temperature=0.0,
|
|
)
|
|
response = res['choices'][0]['message']['content']
|
|
else:
|
|
res = openai.Completion.create(
|
|
model=model,
|
|
prompt=prompt,
|
|
temperature=0.0
|
|
)
|
|
response = res["choices"][0]['text'].strip()
|
|
break
|
|
except openai.error.RateLimitError:
|
|
print('openai.error.RateLimitError\nRetrying...')
|
|
time.sleep(60)
|
|
except openai.error.ServiceUnavailableError:
|
|
print('openai.error.ServiceUnavailableError\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.Timeout:
|
|
print('openai.error.Timeout\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.APIError:
|
|
print('openai.error.APIError\nRetrying...')
|
|
time.sleep(20)
|
|
except openai.error.APIConnectionError:
|
|
print('openai.error.APIConnectionError\nRetrying...')
|
|
time.sleep(20)
|
|
|
|
return response
|
|
|
|
|
|
def evaluation_qa_dataset(model, file, output_path):
|
|
result=[]
|
|
TP = 0
|
|
FP=0
|
|
FN=0
|
|
# test_file=json.loads(file)
|
|
with open(file, 'r', encoding="utf-8") as f:
|
|
# print(f"File content: {file}")
|
|
test_file=json.load(f)
|
|
data = []
|
|
for i in range(len(test_file)):
|
|
data.append(test_file[i])
|
|
for i in range(len(data)):
|
|
question= data[i]["Question"]
|
|
answer=data[i]["Answer"]
|
|
ground_truth = data[i]["Hallucination"]
|
|
|
|
output_samples = get_qa_response(model, question, answer)
|
|
print('sample {} success......'.format(i))
|
|
if ("Yes" in output_samples and "YES" in ground_truth):
|
|
TP=TP+1
|
|
elif ("Yes" in output_samples and "NO" in ground_truth):
|
|
FP=FP+1
|
|
else:
|
|
FN=FN+1
|
|
result.append({"Question":question,"Answer":answer,"Hallucination":ground_truth, "res":output_samples})
|
|
Precision=TP/(TP+FP)
|
|
Recall=TP/(TP+FN)
|
|
F1score=2*(Precision*Recall)/(Precision+Recall)
|
|
print(' F1score: {}'.format(F1score))
|
|
dump_jsonl(result, output_path, append=True)
|
|
# correct = 0
|
|
# incorrect = 0
|
|
# for i in range(len(data)):
|
|
# knowledge = data[i]["knowledge"]
|
|
# question = data[i]["question"]
|
|
# hallucinated_answer = data[i]["hallucinated_answer"]
|
|
# right_answer = data[i]["right_answer"]
|
|
|
|
# if random.random() > 0.5:
|
|
# answer = hallucinated_answer
|
|
# ground_truth = "Yes"
|
|
# else:
|
|
# answer = right_answer
|
|
# ground_truth = "No"
|
|
|
|
# ans = get_qa_response(model, question, answer, instruction)
|
|
# ans = ans.replace(".", "")
|
|
|
|
# if ("Yes" in ans and "No" in ans) or ("Yes" not in ans and "No" not in ans):
|
|
# gen = {"knowledge": knowledge, "question": question, "answer": answer, "ground_truth": ground_truth, "judgement": "failed!"}
|
|
# dump_jsonl(gen, output_path, append=True)
|
|
# incorrect += 1
|
|
# print('sample {} fails......'.format(i))
|
|
# continue
|
|
# elif "Yes" in ans:
|
|
# if ans != "Yes":
|
|
# ans = "Yes"
|
|
# gen = {"knowledge": knowledge, "question": question, "answer": answer, "ground_truth": ground_truth, "judgement": ans}
|
|
# elif "No" in ans:
|
|
# if ans != "No":
|
|
# ans = "No"
|
|
# gen = {"knowledge": knowledge, "question": question, "answer": answer, "ground_truth": ground_truth, "judgement": ans}
|
|
# else:
|
|
# gen = None
|
|
# incorrect += 1
|
|
|
|
# assert(gen is not None)
|
|
|
|
# if ground_truth == ans:
|
|
# correct += 1
|
|
# else:
|
|
# incorrect += 1
|
|
|
|
# print('sample {} success......'.format(i))
|
|
|
|
|
|
# print('{} correct samples, {} incorrect samples, Accuracy: {}'.format(correct, incorrect, correct/len(data)))
|
|
|
|
|
|
def evaluation_dialogue_dataset(model, file, instruction, output_path):
|
|
with open(file, 'r', encoding="utf-8") as f:
|
|
data = []
|
|
for line in f:
|
|
data.append(json.loads(line))
|
|
|
|
correct = 0
|
|
incorrect = 0
|
|
for i in range(len(data)):
|
|
knowledge = data[i]["knowledge"]
|
|
dialog = data[i]["dialogue_history"]
|
|
hallucinated_response = data[i]["hallucinated_response"]
|
|
right_response = data[i]["right_response"]
|
|
|
|
if random.random() > 0.5:
|
|
response = hallucinated_response
|
|
ground_truth = "Yes"
|
|
else:
|
|
response = right_response
|
|
ground_truth = "No"
|
|
|
|
ans = get_dialogue_response(model, dialog, response, instruction)
|
|
ans = ans.replace(".", "")
|
|
|
|
if ("Yes" in ans and "No" in ans) or ("Yes" not in ans and "No" not in ans):
|
|
gen = {"knowledge": knowledge, "dialogue_history": dialog, "response": response, "ground_truth": ground_truth, "judgement": "failed!"}
|
|
dump_jsonl(gen, output_path, append=True)
|
|
incorrect += 1
|
|
print('sample {} fails......'.format(i))
|
|
continue
|
|
elif "Yes" in ans:
|
|
if ans != "Yes":
|
|
ans = "Yes"
|
|
gen = {"knowledge": knowledge, "dialogue_history": dialog, "response": response, "ground_truth": ground_truth, "judgement": ans}
|
|
elif "No" in ans:
|
|
if ans != "No":
|
|
ans = "No"
|
|
gen = {"knowledge": knowledge, "dialogue_history": dialog, "response": response, "ground_truth": ground_truth, "judgement": ans}
|
|
else:
|
|
gen = None
|
|
assert (gen is not None)
|
|
|
|
if ground_truth == ans:
|
|
correct += 1
|
|
else:
|
|
incorrect += 1
|
|
|
|
print('sample {} success......'.format(i))
|
|
dump_jsonl(gen, output_path, append=True)
|
|
|
|
print('{} correct samples, {} incorrect samples, Accuracy: {}'.format(correct, incorrect, correct / len(data)))
|
|
|
|
|
|
def evaluation_summarization_dataset(model, file, instruction, output_path):
|
|
with open(file, 'r', encoding="utf-8") as f:
|
|
data = []
|
|
for line in f:
|
|
data.append(json.loads(line))
|
|
|
|
correct = 0
|
|
incorrect = 0
|
|
for i in range(len(data)):
|
|
|
|
document = data[i]["document"]
|
|
hallucinated_summary = data[i]["hallucinated_summary"]
|
|
right_summary = data[i]["right_summary"]
|
|
|
|
if random.random() > 0.5:
|
|
summary = hallucinated_summary
|
|
ground_truth = "Yes"
|
|
else:
|
|
summary = right_summary
|
|
ground_truth = "No"
|
|
|
|
ans = get_summarization_response(model, document, summary, instruction)
|
|
ans = ans.replace(".", "")
|
|
|
|
if ("Yes" in ans and "No" in ans) or ("Yes" not in ans and "No" not in ans):
|
|
gen = {"document": document, "summary": summary, "ground_truth": ground_truth, "judgement": "failed!"}
|
|
dump_jsonl(gen, output_path, append=True)
|
|
incorrect += 1
|
|
print('sample {} fails......'.format(i))
|
|
continue
|
|
elif "Yes" in ans:
|
|
if ans != "Yes":
|
|
ans = "Yes"
|
|
gen = {"document": document, "summary": summary, "ground_truth": ground_truth, "judgement": ans}
|
|
elif "No" in ans:
|
|
if ans != "No":
|
|
ans = "No"
|
|
gen = {"document": document, "summary": summary, "ground_truth": ground_truth, "judgement": ans}
|
|
else:
|
|
gen = None
|
|
assert (gen is not None)
|
|
|
|
if ground_truth == ans:
|
|
correct += 1
|
|
else:
|
|
incorrect += 1
|
|
|
|
print('sample {} success......'.format(i))
|
|
dump_jsonl(gen, output_path, append=True)
|
|
|
|
print('{} correct samples, {} incorrect samples, Accuracy: {}'.format(correct, incorrect, correct / len(data)))
|
|
|
|
|
|
def dump_jsonl(data, output_path, append=False):
|
|
"""
|
|
Write list of objects to a JSON lines file.
|
|
"""
|
|
mode = 'a+' if append else 'w'
|
|
with open(output_path, mode, encoding='utf-8') as f:
|
|
json_record = json.dumps(data, ensure_ascii=False)
|
|
f.write(json_record + '\n')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description="Hallucination Generation")
|
|
|
|
parser.add_argument("--task", default="qa", help="qa, dialogue, or summarization")
|
|
parser.add_argument("--model", default="qwen", help="model name")
|
|
args = parser.parse_args()
|
|
|
|
instruction_file = "{}/{}_evaluation_instruction.txt".format(args.task, args.task)
|
|
f = open(instruction_file, 'r', encoding="utf-8")
|
|
instruction = f.read()
|
|
|
|
model = args.model
|
|
output_path = "{}/{}_{}_results.json".format(args.task, args.task, args.model)
|
|
|
|
# data = "../data/{}_data.json".format(args.task)
|
|
data="/home/lee/code/HaluEval/evaluation/factuality_train.json"
|
|
|
|
if args.task == "qa":
|
|
evaluation_qa_dataset(model, data, output_path)
|
|
elif args.task == "dialogue":
|
|
evaluation_dialogue_dataset(model, data, instruction, output_path)
|
|
elif args.task == "summarization":
|
|
evaluation_summarization_dataset(model, data, instruction, output_path)
|
|
else:
|
|
raise ValueError("The task must be qa, dialogue, or summarization!")
|