This plugin implements a Neptune source for OpenSearch Ingestion (Data Prepper) that reads change data capture (CDC) events from Neptune Streams and writes them to OpenSearch via S3 partitioning.
Neptune Streams → [Neptune Source Plugin] → S3 (partitioned by entity ID hash)
↓
[S3 Source Plugin] → OpenSearch Sink
The pipeline has two sub-pipelines:
- Neptune → S3: Polls Neptune stream API, converts records, partitions by entity ID hash (256 hex buckets 00-ff), writes to S3
- S3 → OpenSearch: Reads partitioned events from S3, writes to OpenSearch domain
Start Neptune locally from the workspace:
cd /workplace/ankeshk/neptune/
# Start Neptune with streams enabled
# Ensure neptune_streams=enabled in your Neptune configurationCreate these in your personal AWS account:
- S3 bucket:
data-prepper-test(for stream event buffering) - OpenSearch Domain: With public access (easier for local testing)
- Disable fine-grained access control OR set up Cognito
- DynamoDB table:
DataPrepperSourceCoordinationStore(auto-created on first run ifskip_table_creation: false) - IAM Role:
OSPipelineRolewith permissions for S3, DynamoDB, and OpenSearch
IAM Policy example:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": ["arn:aws:s3:::data-prepper-test"]
},
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
"Resource": ["arn:aws:s3:::data-prepper-test/*"]
},
{
"Effect": "Allow",
"Action": ["dynamodb:*"],
"Resource": ["arn:aws:dynamodb:us-west-2:*:table/DataPrepperSourceCoordinationStore"]
}
]
}ssh -i "your-key.pem" -L 8182:db-neptune-1.cluster-xxx.us-west-2.neptune.amazonaws.com:8182 \
ec2-user@ec2-xx-xx-xx-xx.us-west-2.compute.amazonaws.com -N -vada credentials update --account "682633505281" --provider=isengard --role Admin --oncegit clone https://github.com/opensearch-project/data-prepper.git
cd data-prepperCopy the neptune-source directory into data-prepper-plugins/.
Add to settings.gradle:
include 'data-prepper-plugins:neptune-source'Copy config files:
cp config/neptune-pipelines.yaml config/neptune-pipelines.yaml
cp config/data-prepper-config.yaml config/data-prepper-config.yamlEdit config/neptune-pipelines.yaml:
- Set
hostto your Neptune endpoint (orlocalhostwith SSH tunnel) - Set
iamAuth: trueif Neptune has IAM auth enabled - Set
s3_bucketto your S3 bucket - Set OpenSearch
hoststo your domain endpoint - Set
streamTypetopropertygraphorsparql
Edit config/data-prepper-config.yaml:
- Set
skip_table_creation: falsefor first run - Use a unique
partition_prefixfor each run (e.g.,neptune-01,neptune-02)
export AWS_REGION=us-west-2
export DATAPREPPER_SERVICE_NAME=OSI
export SOURCE_COORDINATION_PIPELINE_IDENTIFIER=neptune-01In IntelliJ, run DataPrepperExecute.java with arguments:
config/neptune-pipelines.yaml config/data-prepper-config.yaml
Or from command line:
./gradlew :data-prepper-main:run --args="config/neptune-pipelines.yaml config/data-prepper-config.yaml"Gremlin (Property Graph):
# Add a vertex
curl -X POST --data-binary '{"gremlin": "g.addV(\"person\").property(id, \"1\").property(\"name\", \"martin\")"}' \
https://localhost:8182/gremlin -k
# Add another vertex
curl -X POST --data-binary '{"gremlin": "g.addV(\"person\").property(id, \"2\").property(\"name\", \"vadas\")"}' \
https://localhost:8182/gremlin -k
# Add an edge
curl -X POST --data-binary '{"gremlin": "g.addE(\"knows\").from(__.V(\"1\")).to(__.V(\"2\")).property(\"weight\", 0.5)"}' \
https://localhost:8182/gremlin -k
# Update a property
curl -X POST --data-binary '{"gremlin": "g.V(\"1\").property(single, \"name\", \"amanda\")"}' \
https://localhost:8182/gremlin -k
# Drop a vertex
curl -X POST --data-binary '{"gremlin": "g.V(\"2\").drop()"}' \
https://localhost:8182/gremlin -kVerify stream is working:
curl -k GET 'https://localhost:8182/propertygraph/stream?iteratorType=TRIM_HORIZON'OpenCypher:
curl -X POST --data-binary 'query=CREATE (n:Person {name: "Charlie"})' \
https://localhost:8182/openCypher -kSPARQL:
curl -X POST --data-binary 'update=INSERT DATA { <https://test.com/s1> <https://test.com/p1> <https://test.com/o1> . }' \
https://localhost:8182/sparql -kCheck the OpenSearch dashboard or query the index:
curl -X GET "https://your-opensearch-domain/_search?q=*&pretty"Neptune data is stored in OpenSearch using this unified structure:
{
"entity_id": "v://1",
"entity_type": ["person"],
"document_type": "vertex",
"predicates": {
"name": [{"value": "martin"}],
"age": [{"value": 29}]
}
}- Vertex IDs are prefixed with
v:// - Edge IDs are prefixed with
e:// - Edge documents include
fromandtofields
For each new test run, either:
- Use a different
partition_prefixindata-prepper-config.yamlandSOURCE_COORDINATION_PIPELINE_IDENTIFIER - Delete all items in the DynamoDB
DataPrepperSourceCoordinationStoretable
- StreamRecordsNotFoundException: No new records. The stream worker will back off and retry.
- ConditionalCheckFailedException: Another Data Prepper instance modified the partition. Expected in multi-node setups.
- Connection timeout: Check SSH tunnel is active, or use
insecure: falsewith proper trust store.
- S3 Partitioning: Entity IDs are MD5-hashed to 256 hex partitions (00-ff). Same entity always goes to the same partition for sequential processing.
- NeptunedataClient SDK: Uses the official AWS SDK (
software.amazon.awssdk:neptunedata) for stream access instead of raw HTTP, handling IAM auth automatically. - Source Coordination: Uses DynamoDB-backed source coordination (same as DocumentDB plugin) for distributed partition management.
- Eventual Consistency: Stream records use
upsertaction with painless scripts (future work) to handle out-of-order updates.