How to store MongoDB aggregation pipelines in the database and run them?.
A MongoDB aggregation pipeline is a JSON document, so store it in the database.
Maven dependency:
<dependency>
<groupId>org.mongopipe</groupId>
<artifactId>mongopipe-spring</artifactId>
<version>1.0.0</version> <!-- Get latest from Maven Central or https://mvnrepository.com/artifact/org.mongopipe/mongopipe-spring -->
</dependency>
Add 2 beans required for configuration and autostarting.
@Bean
public MongoPipeConfig getMongoPipeConfig() {
return MongoPipeConfig.builder()
.uri("mongodb://...")
.databaseName("database name")
//.mongoClient(optionallyForCustomConnection) // e.g. for TLS setup
.build();
}
@Bean
public MongoPipeStarter getMongoPipeStarter(MongoPipeConfig mongoPipeConfig) {
return new MongoPipeStarter(mongoPipeConfig);
}
<dependency>
<groupId>org.mongopipe</groupId>
<artifactId>mongopipe-core</artifactId>
<version>1.0.1</version> <!-- Get latest from Maven Central or https://mvnrepository.com/artifact/org.mongopipe/mongopipe-core -->
</dependency>
Manually register config.
Stores.registerConfig(MongoPipeConfig.builder()
.uri("<mongo uri>")
.databaseName("<database name>")
//.mongoClient(optionallyForCustomConnection) // e.g. for TLS setup
.build());
Create a JSON/BSON resource file myFirstPipeline.json
that will be automatically migrated(inserted) in the database collection pipeline_store
at startup.
{
"id": "matchingPizzas",
"collection": "pizzas",
"pipeline": [
{
$match: { size: "${pizzaSize}" }
},
{
$sort : { name : 1 }
}
]
}
src/main/resources/pipelines
(configurable via MongoPipeConfig#migrationConfig#pipelinesPath
).pipeline_store
. Any future changes to the pipeline files will be detected and reflected in the database during startup migration run check. Pipelines.startMigration()
.Create just an interface with @PipelineRun annotations. This way you bind code (interface methods) with pipelines stored in DB.
@Store
public interface MyRestaurant {
@PipelineRun("matchingPizzas") // the db pipeline id, if missing is defaulted to method name.
Stream<Pizza> getMatchingPizzas(String pizzaSize);
}
// With Spring ("mongopipe-spring" dependency):
@Autowired
MyRestaurant myRestaurant;
...
myRestaurant.getMatchingPizzas("MEDIUM");
// Without Spring:
Stores.from(MyRestaurant.class)
.getMatchingPizzas("MEDIUM");
NOTE:
@Store
class annotation is mandatory. The @PipelineRun
annotation is optional and if missing the pipeline id will be defaulted to the method name.@Store
annotated), you can use the
Pipelines.getRunner().run
method. More here: Generic creation and running
You can use this behind a REST api to generically create and run pipelines. @Param
the method parameter and provide the template parameter name: List<Pizza> getMatchingPizzas(@Param("pizzaSize") String pizzaSize)
.pipeline_store
collection: PipelineStore
API. "${paramName}"
. "..": "${paramName}"
) in order to be valid JSON.
On pipeline run the actual parameters values can be of any type including complex types: lists, maps, pojos as long as it can be
converted to a JSON/BSON type."x": "${paramName}",
will become an integer value:"x": 10,
Sometimes instead of using an interface to define the pipeline run methods you can instead manually both create and run a pipeline.
This is useful if you do not want to create @PipelineRun
annotated methods for every pipeline run.
By using PipelineRunner
you can run any pipeline.
By using PipelineStore
and criterias APIs, subparts of the pipeline can be constructed dynamically.
Examples:
// Use PipelineStore for any CRUD operations on pipelines.
PipelineStore pipelineStore = Pipelines.getStore();
PipelineRunner pipelineRunner = Pipelines.getRunner();
// ********
// You can create a pipeline dynamically or from criterias in several ways:
// ********
// ********
// 1. Using Mongo BSON criterias API, static imports are from Mongo driver API class: com.mongodb.client.model.Aggregates/Filters/Sorts.
// Criterias examples: https://www.mongodb.com/docs/drivers/java/sync/v4.7/fundamentals/crud/read-operations/
// ********
// Pipeline stages one by one.
List<Bson> pipelineBson = Arrays.asList(
// Static imports from com.mongodb.client.model.Aggregates/Filters/Sorts
match(and(eq("size", "${size}"), eq("available", "${available}"))),
sort(descending("price")),
limit(3)
);
Pipeline dynamicPipeline = Pipeline.builder()
.id("dynamicPipeline")
.pipeline(pipelineBson)
.collection("testCollection")
.build();
pipelineStore.create(dynamicPipeline);
List<Pizza> pizzas = pipelineRunner.run("matchingPizzas", Pizza.class, Map.of("size", "medium", "available", true)) // Returns a stream
.collect(Collectors.toList());
// ********
// 2. Replacing entire sections of a minimal static pipeline by making them variables.
// As parameter value provide Java types corresponding to JSON (e.g. maps, arrays) or POJO classes with getters and setters.
// ********
// This will normally come from an outside source, provided by an admin/DBA.
String jsonString = "{ \"_id\": \"dynamicPipeline\", \"version\": 1, \"collection\": \"testCollection\", \"pipelineAsString\":\"[" +
"{\\\"$match\\\": {\\\"$and\\\": [{\\\"size\\\": \\\"${size}\\\"}, {\\\"available\\\": \\\"${available}\\\"}]}}," +
"{\\\"$sort\\\": \\\"${sortMap}\\\" }," +
"{\\\"$limit\\\": 3}]\" }";
Pipeline pipeline = BsonUtil.toPojo(jsonString, Pipeline.class);
pipelineStore.create(pipeline);
// 3rd param is a map/object by itself and can be arbitrarily deep. Also an entire stage can be provided.
// NOTE: Here we use a Map type as parameter value for 'sortMap' parameter. But any POJO class of your choice can be used
// and needs just getters and setters in order to be convertible to JSON (BsonUtil#toBsonValue beeing called).
List<Pizza> pizzas = pipelineRunner.runAndList(pipeline.getId(), Pizza.class, Maps.of("size", "medium", "available", true,
"sortMap", Maps.of("price", -1, "name", 1)));
// ********
// 3. From plain JSON/BSON String. Thus entire pipeline can be provided by an admin/DBA.
// ********
String pipelineStages = BsonUtil.escapeJsonFieldValue("[" +
"{\"$match\": {\"$and\": [{\"size\": \"${size}\"}, {\"available\": \"${available}\"}]}}," +
"{\"$sort\": {\"price\": -1}}," +
"{\"$limit\": 3}" +
"]");
String jsonString = "{ \"_id\": \"dynamicPipeline\", \"version\": 1, \"collection\": \"testCollection\"," +
" \"pipelineAsString\":\"" + pipelineStages + "\" }";
Pipeline pipeline = BsonUtil.toPojo(jsonString, Pipeline.class);
pipelineStore.create(pipeline);
List<Pizza> pizzas = pipelineRunner.run("matchingPizzas", Pizza.class, Map.of("size", "medium", "available", true))
.collect(Collectors.toList());
// ********
// 4. Dynamically create/update pipeline using one of the above ways. Then call dedicated method (@PipelineRun annotated).
// ********
// ... create/update Pipeline manually using one of the above methods.
pipelineStore.update(pipeline);
...
myRestaurant.getMatchingPizzas("MEDIUM");
NOTE:
Pipelines.getStore()
can be used also to create, update and delete pipelines.PipelineStore
) you can continue to use both @PipelineRun
annotated methods andPipelineRunner
to run it.BsonUtil#toBsonValue
beeing called on it).The migration will be started automatically on process start if using Spring framework (mongopipe-spring dependency required) or manually by invoking:
Pipelines.startMigration();
How pipeline changes are detected:
lastModifiedTime
of all incoming pipelines and this checksum is compared with the existing one in the db (saved in the previous run).pipeline_store_history
collection for backup purposes. This is configurable. src/main/resources/pipelines
(configurable in step 1
configuration via MongoPipeConfig#migrationConfig#pipelinesPath
) so store your pipelines json files in that folder. RunContextProvider.getContext().setPipelineMigrationSource(yourDbPipelineSource);
Still remember that the final destination of the pipelines (after migration), is the pipeline_store
db collection where you can update them any time at runtime.
A pipeline that was modified at runtime (in the pipeline store) but did not had the golden source updated will not be overwritten on migration.
It will only be overwritten when the corresponding pipeline is updated in the golden source.
NOTE: This is subject to change.
A @Store annotated interface can support both @PipelineRun
methods and also CRUD methods by naming convention.
For CRUD methods by naming convention (similar with Spring Data) the method signature must match one of the methods from org.mongopipe.core.store.CrudStore
.
E.g.:
@Store(
items = {
@Item(type = Pizza.class, collection = "pizzas")
}
)
public interface PizzaRestaurant {
Pizza save(Pizza pizza);
Pizza findById(String id);
Iterable<Pizza> findAll();
Long count();
}
NOTE:
@Store
annotation) decides where to put the items and not vice versa, meaning an item type class is almost storage agnostic.
Thus, the annotation @Store#items
field acts as a database mapping definition recipe.
Another benefit of this mapping definition is that it can be stored in db or in a file, referenced by id, or defaulted to all stores.Find more examples in samples repo.
For performing data updates on MondoDB documents there are several options:
$replaceRoot
.Pipeline#commandOptions
.
The findOneAndUpdate allows also to insert the document if it does not exist.Main documentation site: https://www.mongopipe.org
JavaDoc.
Contributions and suggestions are welcomed from everyone. If you have a bug/proposal just contact us directly or browse the open issues and create a new one.
We like direct discussions. Check email address on the github profile of the committers.