import time
from gradio_client import Client, handle_file
import json
import re
import os
# def extract_invoice_info(markdown_text):
# try:
# # 提取发票号码
# invoice_number = re.search(r'发票号码:\s*(\d+)', markdown_text)
# if not invoice_number:
# raise ValueError("无法提取发票号码")
# # 提取销售方名称
# seller_section = markdown_text.split('销售方信息')[-1]
# seller_name = re.search(r'名称:\s*(.*?)\n', seller_section)
# if not seller_name:
# raise ValueError("无法提取销售方名称")
# # 提取小写金额
# amount = re.search(r'\(小写\)\s*¥(\d+\.\d+)', markdown_text)
# if not amount:
# raise ValueError("无法提取小写金额")
# return {
# "发票号码": invoice_number.group(1),
# "销售方名称": seller_name.group(1).strip(),
# "金额": amount.group(1)
# }
# except Exception as e:
# print(f"提取信息时出错: {e}")
# return None
markdown_text = """ Page 1 of 1
| | |
| --- | --- |
| [QR Code] | |
**发票信息**
电子发票(普通发票)
国家税务总局厦门市税务局
厦门市税务局
发票号码: 25947000000028179639
开票日期: 2025年05月20日
**购买方信息**
名称: 集美大学
统一社会信用代码/纳税人识别号: 12350000426600329N
**销售方信息**
名称: 厦门京东东和贸易有限公司
统一社会信用代码/纳税人识别号: 91350212MA34A9L25L
| 项目名称 |
规格型号 |
单位 |
数量 |
单价 |
金额 |
税率/征收率 |
税额 |
| *计算机配套产品*金骏芦宝 24V1A电源适配器1000mA适用于按摩器甩脂机瘦身腰带LED台灯吸尘器扫地机器人加湿器充电器电源线 |
ROSE-240100C |
个 |
2 |
23.01 |
46.02 |
13% |
5.98 |
| *24V2A电源适配器1000mA |
|
|
|
|
-7.18 |
13% |
-0.93 |
| 合计 |
|
|
|
|
¥38.84 |
|
¥5.05 |
| 价税合计(大写) |
肆拾叁圆捌角玖分 |
|
|
|
(小写) ¥43.89 |
|
|
备 注 订单号:316584139470
开票人: 王梅"""
def extract_invoice_info(markdown_text):
try:
# 提取发票号码
invoice_match = re.search(r'发票号码:\s*(\d+)', markdown_text)
if not invoice_match:
raise ValueError("未找到发票号码信息")
invoice_number = invoice_match.group(1)
# print("invoice_number:", invoice_number)
# 提取销售方名称
seller_section = markdown_text.split('销售方信息')
if len(seller_section) < 2:
raise ValueError("未找到销售方信息部分")
seller_match = re.search(r'名称:\s*(.*?)\n', seller_section[-1])
if not seller_match:
raise ValueError("未找到销售方名称")
seller_name = seller_match.group(1).strip()
# print("seller_name:", seller_name)
# 修正金额正则表达式(移除$符号)
# amount_match = re.search(r'小写\s*¥(\d+\.\d+)', markdown_text)
amount_match = re.search(r'\(小写\)\s*¥(\d+\.\d+)', markdown_text)
if not amount_match:
raise ValueError("未找到金额信息")
amount = amount_match.group(1)
# 构建基础数据
invoice_data = {
"invoice_number": invoice_number,
"seller_name": seller_name,
"total_amount": amount,
"items": []
}
# print("amount:", amount)
# 提取商品明细
item_section = markdown_text.split('')
if len(item_section) < 2:
raise ValueError("未找到商品明细部分")
print("发票号码:", invoice_number)
print("销售方名称:", seller_name)
print("金额:", amount)
# 修正表格解析逻辑
table_rows = re.findall(r'.*?
', item_section[1],re.DOTALL)
if len(table_rows) < 3:
raise ValueError("商品明细数据不完整")
for row in table_rows[:-2]:
# print("row:", row)
item_name_match = re.search(r'(.*?) | ', row)
model_match = re.search(r']*>.*? | \s*]*>(.*?) | ', row)
quantity_match = re.search(r'(\d+) | ', row)
# print("item_name_match:", item_name_match)
# print("model_match:", model_match)
# print("quantity_match:", quantity_match)
# if not all([item_name_match, model_match, quantity_match]):
# raise ValueError("商品信息解析失败")
if item_name_match is not None:
print("项目名称:", item_name_match.group(1))
item_name = item_name_match.group(1)
else:
print("项目名称:", "无")
item_name = "无"
if model_match is not None:
print("规格型号:", model_match.group(1))
model = model_match.group(1)
else:
print("规格型号:", "无")
model = "无"
if quantity_match is not None:
print("数量:", quantity_match.group(1))
quantity = quantity_match.group(1)
else:
print("数量:", "无")
quantity = "无"
item_data = {
"name": item_name,
"model": model,
"quantity": quantity
}
invoice_data["items"].append(item_data)
# # 解析第一行数据
# first_row = table_rows[0] # 跳过表头
# # print("first_row:", first_row)
# item_name_match = re.search(r'(.*?) | ', first_row)
# model_match = re.search(r']*>.*? | \s*]*>(.*?) | ', first_row)
# quantity_match = re.search(r'(\d+) | ', first_row)
# # print("item_name_match:", item_name_match)
# # print("model_match:", model_match)
# # print("quantity_match:", quantity_match)
# if not all([item_name_match, model_match, quantity_match]):
# raise ValueError("商品信息解析失败")
# print("发票号码:", invoice_number)
# print("销售方名称:", seller_name)
# print("项目名称:", item_name_match.group(1))
# print("规格型号:", model_match.group(1))
# print("数量:", quantity_match.group(1))
# print("金额:", amount)
return invoice_data
except Exception as e:
print(f"解析发票信息时出错: {str(e)}")
return None
def convert_pdf_to_markdown(
file_paths: list[str],
client
):
"""
Convert PDF/images to markdown using the API
Args:
client_url: URL of the docext server
username: Authentication username
password: Authentication password
file_paths: List of file paths to convert
model_name: Model to use for conversion
Returns:
str: Converted markdown content
"""
# Prepare file inputs
file_inputs = [{"image": handle_file(file_path)} for file_path in file_paths]
# Convert to markdown (non-streaming)
result = client.predict(
images=file_inputs,
api_name="/process_markdown_streaming"
)
return result
def get_pdf_files(directory):
pdf_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.lower().endswith('.pdf'):
pdf_files.append(os.path.join(root, file))
return pdf_files
if __name__ == "__main__":
# # test extract_invoice_info function
# info = extract_invoice_info(markdown_text)
# print("Extracted invoice info:", info)
# Example usage
# client url can be the local host or the public url like `https://6986bdd23daef6f7eb.gradio.live/`
CLIENT_URL = "https://fceec28e477468b094.gradio.live/"
client = Client(CLIENT_URL, auth=("admin", "admin"))
pdf_directory = "pdfs"
pdf_files = get_pdf_files(pdf_directory)
print(pdf_files)
for pdf_file in pdf_files:
print(f"Found PDF file: {pdf_file}")
# Single image conversion
markdown_content = convert_pdf_to_markdown(
[pdf_file],client
)
# print(markdown_content)
invoice_info = extract_invoice_info(markdown_content)
print(f"Extracted invoice info: {invoice_info}")