+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions lib/sycamore/sycamore/tests/unit/utils/test_pdf_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from io import BytesIO
from pypdf import PdfReader
import pytest
import re
import sycamore
from sycamore.data import Element
from sycamore.utils.pdf_utils import (
flatten_selected_pages,
filter_elements_by_page,
select_pdf_pages,
select_pages,
)
from sycamore.tests.config import TEST_DIR


def test_flatten_selected_pages_single_page():
result = flatten_selected_pages([3], 10)
assert result == ([3], {1: 3})


def test_flatten_selected_pages_page_range():
result = flatten_selected_pages([[2, 4]], 10)
assert result == ([2, 3, 4], {1: 2, 2: 3, 3: 4})


def test_flatten_selected_pages_mixed():
result = flatten_selected_pages([1, [3, 5], 7], 10)
assert result == ([1, 3, 4, 5, 7], {2: 3, 3: 4, 4: 5, 5: 7})


def test_flatten_selected_pages_out_of_order():
result = flatten_selected_pages([[5, 7], 2, [3, 4]], 10)
assert result == ([5, 6, 7, 2, 3, 4], {1: 5, 2: 6, 3: 7, 4: 2, 5: 3, 6: 4})


def test_flatten_selected_pages_invalid_range():
with pytest.raises(ValueError, match=re.escape("For selected_pages like [a, b] it must be that a <= b.")):
flatten_selected_pages([[5, 3]], 10)


def test_flatten_selected_pages_overlapping():
with pytest.raises(ValueError, match="selected_pages may not include overlapping pages."):
flatten_selected_pages([[1, 3], [2, 4]], 10)


def test_flatten_selected_pages_out_of_bounds():
with pytest.raises(ValueError, match="Invalid page number"):
flatten_selected_pages([11], 10)


def test_flatten_selected_pages_zero_page():
with pytest.raises(ValueError, match="Invalid page number"):
flatten_selected_pages([0], 10)


def test_flatten_selected_pages_invalid_type():
with pytest.raises(ValueError, match="Page selection must either be an integer or a 2-element list"):
flatten_selected_pages(["1"], 10)


def test_flatten_selected_pages_empty_input():
result = flatten_selected_pages([], 10)
assert result == ([], {})


def test_flatten_selected_pages_all_pages():
result = flatten_selected_pages([[1, 10]], 10)
assert result == (list(range(1, 11)), {})


def test_flatten_selected_pages_single_page_as_range():
result = flatten_selected_pages([[3, 3]], 10)
assert result == ([3], {1: 3})


def test_select_pdf_pages():
path = TEST_DIR / "resources/data/pdfs/Ray.pdf"

bytes_out = BytesIO()
with open(path, "rb") as infile:
select_pdf_pages(infile, bytes_out, [1, 2, 4])

bytes_out.seek(0)
reader = PdfReader(bytes_out)
assert len(reader.pages) == 3


def test_select_pdf_pages_empty():
path = TEST_DIR / "resources/data/pdfs/Ray.pdf"

bytes_out = BytesIO()
with open(path, "rb") as infile:
select_pdf_pages(infile, bytes_out, [])

bytes_out.seek(0)
reader = PdfReader(bytes_out)
assert len(reader.pages) == 0


def test_select_pdf_pages_invalid_pages():
path = TEST_DIR / "resources/data/pdfs/Ray.pdf"
bytes_out = BytesIO()
with pytest.raises(IndexError):
with open(path, "rb") as infile:
select_pdf_pages(infile, bytes_out, [1, 3, 100])


def test_select_pdf_pages_existing_reader():
path = TEST_DIR / "resources/data/pdfs/Ray.pdf"

bytes_out = BytesIO()
with PdfReader(path) as reader:
select_pdf_pages(reader, bytes_out, [1, 2, 4])
bytes_out.seek(0)
out_reader = PdfReader(bytes_out)
assert len(out_reader.pages) == 3


def test_filter_elements_by_page():
elements = [
Element(properties={"page_number": 1}),
Element(properties={"page_number": 1}),
Element(properties={"page_number": 2}),
Element(properties={"page_number": 3}),
Element(properties={"page_number": 4}),
]

result = filter_elements_by_page(elements, [1])
assert len(result) == 2 and all(e.properties["page_number"] == 1 for e in result)

result = filter_elements_by_page(elements, [2, 4])
assert sorted([e.properties["page_number"] for e in result]) == [1, 2]

result = filter_elements_by_page(elements, [])
assert len(result) == 0

result = filter_elements_by_page(elements, [5])
assert len(result) == 0


def test_select_pages():
import copy

path = TEST_DIR / "resources/data/pdfs/Ray.pdf"
context = sycamore.init(exec_mode=sycamore.EXEC_LOCAL)
docs = context.read.binary(paths=[str(path)], binary_format="pdf").take_all()

assert len(docs) == 1
doc = docs[0]

doc_fn = select_pages([[1, 2], 4])

doc2 = copy.deepcopy(doc)
new_doc = doc_fn(doc2)

assert new_doc.binary_representation is not None
assert len(new_doc.binary_representation) < len(doc.binary_representation)
assert all(e.properties["page_number"] in [1, 2, 4] for e in new_doc.elements)
112 changes: 111 additions & 1 deletion lib/sycamore/sycamore/utils/pdf_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from io import BytesIO
from contextlib import nullcontext
import logging
from typing import Any, BinaryIO, Callable, cast, Union
from PIL import Image

from pypdf import PdfReader, PdfWriter

from sycamore import DocSet
from sycamore.functions.document import DrawBoxes, split_and_convert_to_image
from sycamore.utils.image_utils import show_images
from sycamore.data import Document
from sycamore.data import Document, Element
import json

logger = logging.getLogger(__name__)


def show_pages(docset: DocSet, limit: int = 2):
documents = (
Expand All @@ -22,6 +29,109 @@ def show_pages(docset: DocSet, limit: int = 2):
show_images(images)


def flatten_selected_pages(
selected_pages: list[Union[int, list[int]]], page_count: int
) -> tuple[list[int], dict[int, int]]:
"""
Accepts a page selection that consists of a page (like [11] ), a page range (like [[25,30]] ),
or a combination of both (like [11, [25,30]] ). Pages are 1-indexed.

Returns a list of individual page numbers and a dictionary that maps the new page numbers to the
original page numbers in cases where the two are not equal.
"""

page_list = []
present_pages = set()
remapped_pages = {}
for selection in selected_pages:
if isinstance(selection, int):
selection = [selection, selection]
if isinstance(selection, list):
subset_start, subset_end = selection
if subset_end < subset_start:
raise ValueError("For selected_pages like [a, b] it must be that a <= b.")
for page_num in range(subset_start, subset_end + 1):
if page_num in present_pages:
raise ValueError("selected_pages may not include overlapping pages.")
if page_num <= 0 or page_num > page_count:
raise ValueError(
f"Invalid page number ({page_num}): for this document,"
f"page numbers must be at least 1 and at most {page_count}"
)
present_pages.add(page_num)
page_list.append(page_num)

if page_num != len(page_list):
remapped_pages[len(page_list)] = page_num

else:
raise ValueError("Page selection must either be an integer or a 2-element list [integer, integer]")
return (page_list, remapped_pages)


def select_pdf_pages(input: Union[BinaryIO, PdfReader], out: BinaryIO, page_list: list[int]) -> None:
if isinstance(input, PdfReader):
read_cm: Any = nullcontext(input) # Caller is responsible for cleaning up.
else:
input.seek(0)
read_cm = PdfReader(input)

with read_cm as pdf_reader, PdfWriter() as pdf_writer:
for page_num in page_list:
pdf_writer.add_page(pdf_reader.pages[page_num - 1])
pdf_writer.write_stream(out) # see pypdf issue #2905
out.flush()


def filter_elements_by_page(elements: list[Element], page_numbers: list[int]) -> list[Element]:
page_map = {num: idx + 1 for idx, num in enumerate(page_numbers)}
new_elements = []
for element in elements:
page_number = element.properties.get("page_number")
if (new_number := page_map.get(cast(int, page_number))) is not None:
# renumber pages so the elements reference the pages in the new document.
element.properties["page_number"] = new_number
new_elements.append(element)
return new_elements


def select_pages(page_selection: list[Union[int, list[int]]]) -> Callable[[Document], Document]:
"""
Returns a function that selects pages from a PDF document based on a list of page selections.
Each selection can be a single page number or a range of page numbers. Page numbers are 1-indexed.

Examples:
[1,2,3] pages 1, 2, and 3
[[1,3], 5] pages 1, 2, 3, and 5
[[1,3], [5,7] pages 1, 2, 3, and 5, 6, 7
[2, 1, [4, 6]] pages 2, 1, 4, 5, 6, in that order

Args:
page_selection: A list of page numbers or page ranges to select. Page numbers are 1-indexed.

"""

def select_pages_fn(doc: Document) -> Document:
if doc.binary_representation is None:
logging.warning("No binary_representation found in doc {doc.doc_id}. Skipping page selection.")
return doc

outstream = BytesIO()

with PdfReader(BytesIO(doc.binary_representation)) as reader:
page_count = len(reader.pages)
page_list, remapped_pages = flatten_selected_pages(page_selection, page_count)
select_pdf_pages(reader, outstream, page_list=page_list)

doc.binary_representation = outstream.getvalue()
doc.properties["remapped_pages"] = remapped_pages
new_elements = filter_elements_by_page(doc.elements, page_list)
doc.elements = new_elements
return doc

return select_pages_fn


def enumerate_images_and_tables(m_pages: list[Document]):
from IPython.display import display, HTML

Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载