Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Extensions.DataIngestion;

/// <summary>
/// Reads source content and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
/// <typeparam name="TSource">The type of the source to read from. Sample: <see cref="FileInfo"/>, <see cref="Stream"/>, etc.</typeparam>
public interface IIngestionDocumentReader<TSource>
{
/// <summary>
/// Reads a source and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="source">The source to read.</param>
/// <param name="identifier">The unique identifier for the document.</param>
/// <param name="mediaType">The media type of the source (if needed).</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous read operation.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="identifier"/> is <see langword="null"/> or empty.</exception>
Task<IngestionDocument> ReadAsync(TSource source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Microsoft.Extensions.DataIngestion;
/// <summary>
/// Reads source content and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
public abstract class IngestionDocumentReader
public abstract class IngestionDocumentReader : IIngestionDocumentReader<FileInfo>, IIngestionDocumentReader<Stream>
{
/// <summary>
/// Reads a file and converts it to an <see cref="IngestionDocument"/>.
Expand All @@ -24,7 +24,7 @@ public abstract class IngestionDocumentReader
public Task<IngestionDocument> ReadAsync(FileInfo source, CancellationToken cancellationToken = default)
{
string identifier = Throw.IfNull(source).FullName; // entire path is more unique than just part of it.
return ReadAsync(source, identifier, GetMediaType(source), cancellationToken);
return ReadAsync(source, identifier, source.GetMediaType(), cancellationToken);
}

/// <summary>
Expand All @@ -42,7 +42,7 @@ public virtual async Task<IngestionDocument> ReadAsync(FileInfo source, string i
_ = Throw.IfNullOrEmpty(identifier);

using FileStream stream = new(source.FullName, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 1, FileOptions.Asynchronous);
return await ReadAsync(stream, identifier, string.IsNullOrEmpty(mediaType) ? GetMediaType(source) : mediaType!, cancellationToken).ConfigureAwait(false);
return await ReadAsync(stream, identifier, string.IsNullOrEmpty(mediaType) ? source.GetMediaType() : mediaType!, cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -53,90 +53,5 @@ public virtual async Task<IngestionDocument> ReadAsync(FileInfo source, string i
/// <param name="mediaType">The media type of the content.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous read operation.</returns>
public abstract Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default);

private static string GetMediaType(FileInfo source)
=> source.Extension switch
{
".123" => "application/vnd.lotus-1-2-3",
".602" => "application/x-t602",
".abw" => "application/x-abiword",
".bmp" => "image/bmp",
".cgm" => "image/cgm",
".csv" => "text/csv",
".cwk" => "application/x-cwk",
".dbf" => "application/vnd.dbf",
".dif" => "application/x-dif",
".doc" => "application/msword",
".docm" => "application/vnd.ms-word.document.macroEnabled.12",
".docx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".dot" => "application/msword",
".dotm" => "application/vnd.ms-word.template.macroEnabled.12",
".dotx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.template",
".epub" => "application/epub+zip",
".et" => "application/vnd.ms-excel",
".eth" => "application/ethos",
".fods" => "application/vnd.oasis.opendocument.spreadsheet",
".gif" => "image/gif",
".htm" => "text/html",
".html" => "text/html",
".hwp" => "application/x-hwp",
".jpeg" => "image/jpeg",
".jpg" => "image/jpeg",
".key" => "application/x-iwork-keynote-sffkey",
".lwp" => "application/vnd.lotus-wordpro",
".mcw" => "application/macwriteii",
".mw" => "application/macwriteii",
".numbers" => "application/x-iwork-numbers-sffnumbers",
".ods" => "application/vnd.oasis.opendocument.spreadsheet",
".pages" => "application/x-iwork-pages-sffpages",
".pbd" => "application/x-pagemaker",
".pdf" => "application/pdf",
".png" => "image/png",
".pot" => "application/vnd.ms-powerpoint",
".potm" => "application/vnd.ms-powerpoint.template.macroEnabled.12",
".potx" => "application/vnd.openxmlformats-officedocument.presentationml.template",
".ppt" => "application/vnd.ms-powerpoint",
".pptm" => "application/vnd.ms-powerpoint.presentation.macroEnabled.12",
".pptx" => "application/vnd.openxmlformats-officedocument.presentationml.presentation",
".prn" => "application/x-prn",
".qpw" => "application/x-quattro-pro",
".rtf" => "application/rtf",
".sda" => "application/vnd.stardivision.draw",
".sdd" => "application/vnd.stardivision.impress",
".sdp" => "application/sdp",
".sdw" => "application/vnd.stardivision.writer",
".sgl" => "application/vnd.stardivision.writer",
".slk" => "text/vnd.sylk",
".sti" => "application/vnd.sun.xml.impress.template",
".stw" => "application/vnd.sun.xml.writer.template",
".svg" => "image/svg+xml",
".sxg" => "application/vnd.sun.xml.writer.global",
".sxi" => "application/vnd.sun.xml.impress",
".sxw" => "application/vnd.sun.xml.writer",
".sylk" => "text/vnd.sylk",
".tiff" => "image/tiff",
".tsv" => "text/tab-separated-values",
".txt" => "text/plain",
".uof" => "application/vnd.uoml+xml",
".uop" => "application/vnd.openofficeorg.presentation",
".uos1" => "application/vnd.uoml+xml",
".uos2" => "application/vnd.uoml+xml",
".uot" => "application/x-uo",
".vor" => "application/vnd.stardivision.writer",
".webp" => "image/webp",
".wpd" => "application/wordperfect",
".wps" => "application/vnd.ms-works",
".wq1" => "application/x-lotus",
".wq2" => "application/x-lotus",
".xls" => "application/vnd.ms-excel",
".xlsb" => "application/vnd.ms-excel.sheet.binary.macroEnabled.12",
".xlsm" => "application/vnd.ms-excel.sheet.macroEnabled.12",
".xlr" => "application/vnd.ms-works",
".xlsx" => "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".xlw" => "application/vnd.ms-excel",
".xml" => "application/xml",
".zabw" => "application/x-abiword",
_ => "application/octet-stream"
};
public abstract Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
<NoWarn>$(NoWarn);S1694</NoWarn>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Microsoft.Extensions.DataIngestion\Utils\MediaTypeProvider.cs" Link="MediaTypeProvider.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Memory" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Extensions.DataIngestion;
/// <summary>
/// Reads documents by converting them to Markdown using the <see href="https://github.com/microsoft/markitdown">MarkItDown</see> MCP server.
/// </summary>
public class MarkItDownMcpReader : IngestionDocumentReader
public class MarkItDownMcpReader : IngestionDocumentReader, IIngestionDocumentReader<DataContent>
{
private readonly Uri _mcpServerUri;
private readonly McpClientOptions? _options;
Expand Down Expand Up @@ -65,7 +65,7 @@ public override async Task<IngestionDocument> ReadAsync(FileInfo source, string
}

/// <inheritdoc/>
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default)
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);
Expand All @@ -79,10 +79,18 @@ public override async Task<IngestionDocument> ReadAsync(Stream source, string id
#endif
DataContent dataContent = new(
ms.GetBuffer().AsMemory(0, (int)ms.Length),
string.IsNullOrEmpty(mediaType) ? "application/octet-stream" : mediaType);
string.IsNullOrEmpty(mediaType) ? "application/octet-stream" : mediaType!);

string markdown = await ConvertToMarkdownAsync(dataContent, cancellationToken).ConfigureAwait(false);
return await ReadAsync(dataContent, identifier, mediaType, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task<IngestionDocument> ReadAsync(DataContent source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);

string markdown = await ConvertToMarkdownAsync(source, cancellationToken).ConfigureAwait(false);
return MarkdownParser.Parse(markdown, identifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public override async Task<IngestionDocument> ReadAsync(FileInfo source, string

/// <inheritdoc/>
/// <remarks>The contents of <paramref name="source"/> are copied to a temporary file.</remarks>
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default)
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using Microsoft.Extensions.DataIngestion;
IngestionDocumentReader reader =
new MarkItDownReader(new FileInfo(@"pathToMarkItDown.exe"), extractImages: true);

using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
using IngestionPipeline<FileInfo, string> pipeline = new(reader, CreateChunker(), CreateWriter());
```

### Creating a MarkItDownMcpReader for Data Ingestion (MCP Server)
Expand All @@ -44,7 +44,7 @@ using Microsoft.Extensions.DataIngestion;
IngestionDocumentReader reader =
new MarkItDownMcpReader(new Uri("http://localhost:3001/mcp"));

using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
using IngestionPipeline<FileInfo, string> pipeline = new(reader, CreateChunker(), CreateWriter());
```

The MarkItDown MCP server can be run using Docker:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public override async Task<IngestionDocument> ReadAsync(FileInfo source, string
}

/// <inheritdoc/>
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default)
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ using Microsoft.Extensions.DataIngestion;

IngestionDocumentReader reader = new MarkdownReader();

using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
using IngestionPipeline<FileInfo, string> pipeline = new(reader, CreateChunker(), CreateWriter());
```

## Feedback & Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal static class ProcessFiles

internal static class ProcessSource
{
internal const string ActivityName = "ProcessSource";
internal const string DocumentIdTagName = "rag.document.id";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
using static Microsoft.Extensions.DataIngestion.DiagnosticsConstants;

namespace Microsoft.Extensions.DataIngestion;

#pragma warning disable IDE0058 // Expression value is never used
#pragma warning disable IDE0063 // Use simple 'using' statement
#pragma warning disable CA1031 // Do not catch general exception types

/// <summary>
/// Provides a set of File System extension methods for the <see cref="IngestionPipeline{FileInfo, TChunk}"/> class.
/// </summary>
public static class FileSystemIngestionExtensions
{
/// <summary>
/// Processes all files in the specified directory that match the given search pattern and option.
/// </summary>
/// <typeparam name="TChunk">The type of the chunk content.</typeparam>
/// <param name="pipeline">The ingestion pipeline.</param>
/// <param name="directory">The directory to process.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public static async IAsyncEnumerable<IngestionResult> ProcessAsync<TChunk>(
this IngestionPipeline<FileInfo, TChunk> pipeline,
DirectoryInfo directory, string searchPattern = "*.*",
SearchOption searchOption = SearchOption.TopDirectoryOnly,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(pipeline);
Throw.IfNull(directory);
Throw.IfNullOrEmpty(searchPattern);
Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);

using (Activity? rootActivity = pipeline.ActivitySource.StartActivity(ProcessDirectory.ActivityName))
{
rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName)
.SetTag(ProcessDirectory.SearchPatternTagName, searchPattern)
.SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
pipeline.Logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);

var files = directory.EnumerateFiles(searchPattern, searchOption);
await foreach (var ingestionResult in pipeline.ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
}
}

/// <summary>
/// Processes the specified files.
/// </summary>
/// <typeparam name="TChunk">The type of the chunk content.</typeparam>
/// <param name="pipeline">The ingestion pipeline.</param>
/// <param name="files">The collection of files to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public static async IAsyncEnumerable<IngestionResult> ProcessAsync<TChunk>(
this IngestionPipeline<FileInfo, TChunk> pipeline,
IEnumerable<FileInfo> files,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(pipeline);
Throw.IfNull(files);

using (Activity? rootActivity = pipeline.ActivitySource.StartActivity(ProcessFiles.ActivityName))
{
await foreach (var ingestionResult in pipeline.ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
}
}

private static async IAsyncEnumerable<IngestionResult> ProcessAsync<TChunk>(
this IngestionPipeline<FileInfo, TChunk> pipeline,
IEnumerable<FileInfo> files, Activity? rootActivity,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
#if NET
if (System.Linq.Enumerable.TryGetNonEnumeratedCount(files, out int count))
#else
if (files is IReadOnlyCollection<FileInfo> { Count: int count })
#endif
{
rootActivity?.SetTag(ProcessFiles.FileCountTagName, count);
pipeline.Logger?.LogFileCount(count);
}

foreach (FileInfo fileInfo in files)
{
using (Activity? processFileActivity = pipeline.ActivitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
{
processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);

Exception? failure = null;
IngestionDocument? document = null;

try
{
document = await pipeline.ProcessAsync(fileInfo, fileInfo.FullName, fileInfo.GetMediaType(), cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
failure = e;
}

string documentId = document?.Identifier ?? fileInfo.FullName;
yield return new IngestionResult(documentId, document, failure);
}
}
}
}
Loading
Loading