mongopipe-core

Licence Open Source Supported JVM Maven Central with version prefix filter GitHub open issues GitHub open issues javadoc Mongopipe

Intro

How to store MongoDB aggregation pipelines in the database and run them?.
A MongoDB aggregation pipeline is a JSON document, so store it in the database.

logo.png

Why storing queries in DB?

Use cases:

Getting started in 3 easy steps

  1. Configuration
  2. Create JSON pipelines
  3. Run pipelines using @Stores

1. Configuration

With Spring:

Maven dependency:

<dependency>
    <groupId>org.mongopipe</groupId>
    <artifactId>mongopipe-spring</artifactId>
    <version>1.0.0</version> <!-- Get latest from Maven Central or https://mvnrepository.com/artifact/org.mongopipe/mongopipe-spring -->
</dependency>

Add 2 beans required for configuration and autostarting.

@Bean
public MongoPipeConfig getMongoPipeConfig() {
  return MongoPipeConfig.builder()
      .uri("mongodb://...")
      .databaseName("database name")
      //.mongoClient(optionallyForCustomConnection) // e.g. for TLS setup
      .build();
}

@Bean
public MongoPipeStarter getMongoPipeStarter(MongoPipeConfig mongoPipeConfig) {
  return new MongoPipeStarter(mongoPipeConfig);
}

Without Spring:

<dependency>
  <groupId>org.mongopipe</groupId>
  <artifactId>mongopipe-core</artifactId>
  <version>1.0.1</version> <!-- Get latest from Maven Central or https://mvnrepository.com/artifact/org.mongopipe/mongopipe-core -->
</dependency>

Manually register config.

Stores.registerConfig(MongoPipeConfig.builder()
  .uri("<mongo uri>")
  .databaseName("<database name>")
  //.mongoClient(optionallyForCustomConnection) // e.g. for TLS setup
  .build());

2. Create JSON pipelines

Create a JSON/BSON resource file myFirstPipeline.json that will be automatically migrated(inserted) in the database collection pipeline_store at startup.

{
 "id": "matchingPizzas",
 "collection": "pizzas",
 "pipeline": [
   {
      $match: { size: "${pizzaSize}" }
   },
   { 
      $sort : { name : 1 } 
   }
 ]
}

3. Run pipelines

Create just an interface with @PipelineRun annotations. This way you bind code (interface methods) with pipelines stored in DB.

@Store
public interface MyRestaurant {
  @PipelineRun("matchingPizzas") // the db pipeline id, if missing is defaulted to method name. 
  Stream<Pizza> getMatchingPizzas(String pizzaSize);      
}    

// With Spring ("mongopipe-spring" dependency):
@Autowired
MyRestaurant myRestaurant;
...
myRestaurant.getMatchingPizzas("MEDIUM");

// Without Spring:
Stores.from(MyRestaurant.class)
  .getMatchingPizzas("MEDIUM");

NOTE:

More on pipeline files

Dynamic creation and running with criterias

Sometimes instead of using an interface to define the pipeline run methods you can instead manually both create and run a pipeline. This is useful if you do not want to create @PipelineRun annotated methods for every pipeline run.
By using PipelineRunner you can run any pipeline.
By using PipelineStore and criterias APIs, subparts of the pipeline can be constructed dynamically.
Examples:

    // Use PipelineStore for any CRUD operations on pipelines. 
    PipelineStore pipelineStore = Pipelines.getStore();
    PipelineRunner pipelineRunner = Pipelines.getRunner();

    // ********
    // You can create a pipeline dynamically or from criterias in several ways:
    // ********
        
    // ********
    // 1. Using Mongo BSON criterias API, static imports are from Mongo driver API class: com.mongodb.client.model.Aggregates/Filters/Sorts.
    // Criterias examples: https://www.mongodb.com/docs/drivers/java/sync/v4.7/fundamentals/crud/read-operations/    
    // ********
    // Pipeline stages one by one.
    List<Bson> pipelineBson = Arrays.asList(
        // Static imports from com.mongodb.client.model.Aggregates/Filters/Sorts
        match(and(eq("size", "${size}"), eq("available", "${available}"))),
        sort(descending("price")),
        limit(3)
    );
    Pipeline dynamicPipeline = Pipeline.builder()
        .id("dynamicPipeline")
        .pipeline(pipelineBson)
        .collection("testCollection")
        .build();
    pipelineStore.create(dynamicPipeline);
    
    List<Pizza> pizzas = pipelineRunner.run("matchingPizzas", Pizza.class, Map.of("size", "medium", "available", true)) // Returns a stream
        .collect(Collectors.toList());

    
    // ********
    // 2. Replacing entire sections of a minimal static pipeline by making them variables.
    //    As parameter value provide Java types corresponding to JSON (e.g. maps, arrays) or POJO classes with getters and setters. 
    // ********
    
    // This will normally come from an outside source, provided by an admin/DBA.
    String jsonString = "{ \"_id\": \"dynamicPipeline\", \"version\": 1, \"collection\": \"testCollection\", \"pipelineAsString\":\"[" +
        "{\\\"$match\\\": {\\\"$and\\\": [{\\\"size\\\": \\\"${size}\\\"}, {\\\"available\\\": \\\"${available}\\\"}]}}," +
        "{\\\"$sort\\\": \\\"${sortMap}\\\" }," +
        "{\\\"$limit\\\": 3}]\" }";
    Pipeline pipeline = BsonUtil.toPojo(jsonString, Pipeline.class);
    pipelineStore.create(pipeline);

    // 3rd param is a map/object by itself and can be arbitrarily deep. Also an entire stage can be provided.
    // NOTE: Here we use a Map type as parameter value for 'sortMap' parameter. But any POJO class of your choice can be used
    // and needs just getters and setters in order to be convertible to JSON (BsonUtil#toBsonValue beeing called). 
    List<Pizza> pizzas = pipelineRunner.runAndList(pipeline.getId(), Pizza.class, Maps.of("size", "medium", "available", true, 
        "sortMap", Maps.of("price", -1, "name", 1)));

    
    // ********
    // 3. From plain JSON/BSON String. Thus entire pipeline can be provided by an admin/DBA.
    // ********
    String pipelineStages = BsonUtil.escapeJsonFieldValue("[" +
        "{\"$match\": {\"$and\": [{\"size\": \"${size}\"}, {\"available\": \"${available}\"}]}}," +
        "{\"$sort\": {\"price\": -1}}," +
        "{\"$limit\": 3}" +
        "]");
    String jsonString = "{ \"_id\": \"dynamicPipeline\", \"version\": 1, \"collection\": \"testCollection\"," +
        " \"pipelineAsString\":\"" + pipelineStages + "\" }";
    Pipeline pipeline = BsonUtil.toPojo(jsonString, Pipeline.class);
    pipelineStore.create(pipeline);
    List<Pizza> pizzas = pipelineRunner.run("matchingPizzas", Pizza.class, Map.of("size", "medium", "available", true))
        .collect(Collectors.toList());

    
    // ********
    // 4. Dynamically create/update pipeline using one of the above ways. Then call dedicated method (@PipelineRun annotated).
    // ********
    // ... create/update Pipeline manually using one of the above methods.
    pipelineStore.update(pipeline);
    ...
    myRestaurant.getMatchingPizzas("MEDIUM");


NOTE:

Migration

The migration will be started automatically on process start if using Spring framework (mongopipe-spring dependency required) or manually by invoking:

Pipelines.startMigration();

How pipeline changes are detected:

  1. Fast check: A fast global checksum is created from the lastModifiedTime of all incoming pipelines and this checksum is compared with the existing one in the db (saved in the previous run).
  2. Deep check: If the global checksum based on timestamp matches then skip migration. Else iterate on each incoming pipeline, create a checksum based on content, compare it with the existing checksum in db(saved in a previous run) and create/update db pipeline if content checksums do not match. Also save latest checksums at the end.
    The prior value of an updated pipeline will be saved in the pipeline_store_history collection for backup purposes. This is configurable.
    Default pipeline migration golden source is src/main/resources/pipelines (configurable in step 1 configuration via MongoPipeConfig#migrationConfig#pipelinesPath) so store your pipelines json files in that folder.
    The pipelines golden source is also configurable in case you want to keep track of the original pipelines within a database instead of a resources folder:
    RunContextProvider.getContext().setPipelineMigrationSource(yourDbPipelineSource);
    

    Still remember that the final destination of the pipelines (after migration), is the pipeline_store db collection where you can update them any time at runtime.
    A pipeline that was modified at runtime (in the pipeline store) but did not had the golden source updated will not be overwritten on migration. It will only be overwritten when the corresponding pipeline is updated in the golden source.

CRUD stores

NOTE: This is subject to change.
A @Store annotated interface can support both @PipelineRun methods and also CRUD methods by naming convention.
For CRUD methods by naming convention (similar with Spring Data) the method signature must match one of the methods from org.mongopipe.core.store.CrudStore. E.g.:

@Store(
    items = {
        @Item(type = Pizza.class, collection = "pizzas")
    }
)
public interface PizzaRestaurant {
  Pizza save(Pizza pizza);
  Pizza findById(String id);
  Iterable<Pizza> findAll();
  Long count();
}

NOTE:

  1. The store(via the @Store annotation) decides where to put the items and not vice versa, meaning an item type class is almost storage agnostic. Thus, the annotation @Store#items field acts as a database mapping definition recipe. Another benefit of this mapping definition is that it can be stored in db or in a file, referenced by id, or defaulted to all stores.
  2. This feature is not yet mature, main feature is to manage and run pipelines.

More examples

Find more examples in samples repo.

Data update operations

For performing data updates on MondoDB documents there are several options:

  1. Without pipelines, you can use CRUD stores. Still has a limited set of operations.
  2. With pipelines, using update stages like for example the $replaceRoot.
  3. With pipelines, using dedicated commands like for example findOneAndUpdate which can be run by setting Pipeline#commandOptions. The findOneAndUpdate allows also to insert the document if it does not exist.

Docs

Main documentation site: https://www.mongopipe.org
JavaDoc.

TODO

Support and get in touch

Contributions and suggestions are welcomed from everyone. If you have a bug/proposal just contact us directly or browse the open issues and create a new one.
We like direct discussions. Check email address on the github profile of the committers.