Aggregation in Spring Data MongoDB

Aggregation in MongoDB was built to process data and return computed results. Data is processed in stages and the output of one stage is provided as input to the next stage.

Aggregation Using MongoTemplate

{
    "_id" : "01001",
    "city" : "AGAWAM",
    "loc" : [
        -72.622739,
        42.070206
    ],
    "pop" : 15338,
    "state" : "MA"
}

Get All the States With a Population Greater Than 10 Million Order by Population Descending

GroupOperation groupByStateAndSumPop = group("state")
  .sum("pop").as("statePop");
MatchOperation filterStates = match(new Criteria("statePop").gt(10000000));
SortOperation sortByPopDesc = sort(Sort.by(Direction.DESC, "statePop"));

Aggregation aggregation = newAggregation(
  groupByStateAndSumPop, filterStates, sortByPopDesc);
AggregationResults<StatePopulation> result = mongoTemplate.aggregate(
  aggregation, "zips", StatePopulation.class);

The expected output will have a field _id as state and a field statePop with the total state population:

public class StatePoulation {
 
    @Id
    private String state;
    private Integer statePop;
 
    // standard getters and setters
}

The @Id annotation will map the _id field from output to state in the model:

Get Smallest State by Average City Population

GroupOperation sumTotalCityPop = group("state", "city")
  .sum("pop").as("cityPop");
GroupOperation averageStatePop = group("_id.state")
  .avg("cityPop").as("avgCityPop");
SortOperation sortByAvgPopAsc = sort(Sort.by(Direction.ASC, "avgCityPop"));
LimitOperation limitToOnlyFirstDoc = limit(1);
ProjectionOperation projectToMatchModel = project()
  .andExpression("_id").as("state")
  .andExpression("avgCityPop").as("statePop");

Aggregation aggregation = newAggregation(
  sumTotalCityPop, averageStatePop, sortByAvgPopAsc,
  limitToOnlyFirstDoc, projectToMatchModel);

AggregationResults<StatePopulation> result = mongoTemplate
  .aggregate(aggregation, "zips", StatePopulation.class);
StatePopulation smallestState = result.getUniqueMappedResult();

Get the State With Maximum and Minimum Zip Codes

GroupOperation sumZips = group("state").count().as("zipCount");
SortOperation sortByCount = sort(Direction.ASC, "zipCount");
GroupOperation groupFirstAndLast = group().first("_id").as("minZipState")
  .first("zipCount").as("minZipCount").last("_id").as("maxZipState")
  .last("zipCount").as("maxZipCount");

Aggregation aggregation = newAggregation(sumZips, sortByCount, groupFirstAndLast);

AggregationResults<Document> result = mongoTemplate
  .aggregate(aggregation, "zips", Document.class);
Document document= result.getUniqueMappedResult();

Aggregation Using MongoRepository

@Document(collection = "property")
public class Property {

    @Id
    private String id;
    @Field("price")
    private int price;
    @Field("area")
    private int area;
    @Field("property_type")
    private String propertyType;
    @Field("transaction_type")
    private String transactionType;
    
    // Constructor, getters, setters, toString()
    
}
@Aggregation(pipeline = {
        "Operation/Stage 1...",
        "Operation/Stage 2...",
        "Operation/Stage 3...",
})
List<Property> someMethod();
@Aggregation(pipeline = {
    "{'$match':{'transaction_type':'For Sale', 'price' : {$gt : 100000}}",
})
List<Property> findExpensivePropertiesForSale();
@Aggregation(pipeline = {
        "{'$match':{'transaction_type': ?0, 'price' : {$gt : ?1}}",
})
List<Property> findPropertiesByTransactionTypeAndPriceGTPositional(String transactionType, int price);

@Aggregation(pipeline = {
        "{'$match':{'transaction_type': #{#transactionType}, 'price' : {$gt : #{#price}}}",
})
List<Property> findPropertiesByTransactionTypeAndPriceGTNamed(@Param("transactionType") String transactionType, @Param("price") int price);
@Aggregation(pipeline = {
        "{'$match':{'transaction_type':?0, 'price': {$gt: ?1} }}",
        "{'$sample':{size:?2}}",
        "{'$sort':{'area':-1}}"
})
List<Property> findPropertiesByTransactionTypeAndPriceGT(String transactionType, int price, int sampleSize);
@Aggregation(pipeline = {
        "{'$match':{'transaction_type':?0, 'price': {$gt: ?1} }}",
        "{'$sample':{size:?2}}",
        "{'$sort':{'area':-1}}"
})
Iterable<Property> findPropertiesByTransactionTypeAndPriceGTPageable(String transactionType, int price, int sampleSize, Pageable pageable);

References
https://www.baeldung.com/spring-data-mongodb-projections-aggregations
https://stackabuse.com/spring-data-mongodb-guide-to-the-aggregation-annotation/

Projection in Spring Data MongoDB

In MongoDB, Projections are a way to fetch only the required fields of a document from a database. This reduces the amount of data that has to be transferred from database server to client and hence increases performance.

@Document
public class User {
    @Id
    private String id;
    private String name;
    private Integer age;
    
    // standard getters and setters
}

Projections Using MongoTemplate

Query query = new Query();
query.fields().include("name").exclude("id");
List<User> john = mongoTemplate.find(query, User.class);

Projections Using MongoRepository

@Query(value="{}", fields="{name : 1, _id : 0}")
List<User> findNameAndExcludeId();

References
https://www.baeldung.com/spring-data-mongodb-projections-aggregations

Use @Query annotation in Spring Data MongoDB

With this annotation, we can specify a raw query as a Mongo JSON query string.

FindBy

@Query("{ 'name' : ?0 }")
List<User> findUsersByName(String name);
List<User> users = userRepository.findUsersByName("Eric");

$regex

@Query("{ 'name' : { $regex: ?0 } }")
List<User> findUsersByRegexpName(String regexp);
List<User> users = userRepository.findUsersByRegexpName("^A");
List<User> users = userRepository.findUsersByRegexpName("c$");

$lt and $gt

@Query("{ 'age' : { $gt: ?0, $lt: ?1 } }")
List<User> findUsersByAgeBetween(int ageGT, int ageLT);
List<User> users = userRepository.findUsersByAgeBetween(20, 50);

References
https://www.baeldung.com/queries-in-spring-data-mongodb

Aggregation in MongoDB

Aggregation Operations

Aggregation operations process multiple documents and return computed results. You can use aggregation operations to:

  • Group values from multiple documents together.
  • Perform operations on the grouped data to return a single result.
  • Analyze data changes over time.

Aggregation Pipelines

An aggregation pipeline consists of one or more stages that process documents:

  • Each stage performs an operation on the input documents. For example, a stage can filter documents, group documents, and calculate values.
  • The documents that are output from a stage are passed to the next stage.
  • An aggregation pipeline can return results for groups of documents. For example, return the total, average, maximum, and minimum values.
db.orders.insertMany( [
   { _id: 0, name: "Pepperoni", size: "small", price: 19,
     quantity: 10, date: ISODate( "2021-03-13T08:14:30Z" ) },
   { _id: 1, name: "Pepperoni", size: "medium", price: 20,
     quantity: 20, date : ISODate( "2021-03-13T09:13:24Z" ) },
   { _id: 2, name: "Pepperoni", size: "large", price: 21,
     quantity: 30, date : ISODate( "2021-03-17T09:22:12Z" ) },
   { _id: 3, name: "Cheese", size: "small", price: 12,
     quantity: 15, date : ISODate( "2021-03-13T11:21:39.736Z" ) },
   { _id: 4, name: "Cheese", size: "medium", price: 13,
     quantity:50, date : ISODate( "2022-01-12T21:23:13.331Z" ) },
   { _id: 5, name: "Cheese", size: "large", price: 14,
     quantity: 10, date : ISODate( "2022-01-12T05:08:13Z" ) },
   { _id: 6, name: "Vegan", size: "small", price: 17,
     quantity: 10, date : ISODate( "2021-01-13T05:08:13Z" ) },
   { _id: 7, name: "Vegan", size: "medium", price: 18,
     quantity: 10, date : ISODate( "2021-01-13T05:10:13Z" ) }
] )

Calculate Total Order Quantity

db.orders.aggregate( [

   // Stage 1: Filter pizza order documents by pizza size
   {
      $match: { size: "medium" }
   },

   // Stage 2: Group remaining documents by pizza name and calculate total quantity
   {
      $group: { _id: "$name", totalQuantity: { $sum: "$quantity" } }
   }

] )

Example output:

[
   { _id: 'Cheese', totalQuantity: 50 },
   { _id: 'Vegan', totalQuantity: 10 },
   { _id: 'Pepperoni', totalQuantity: 20 }
]

Calculate Total Order Value and Average Order Quantity

db.orders.aggregate( [

   // Stage 1: Filter pizza order documents by date range
   {
      $match:
      {
         "date": { $gte: new ISODate( "2020-01-30" ), $lt: new ISODate( "2022-01-30" ) }
      }
   },

   // Stage 2: Group remaining documents by date and calculate results
   {
      $group:
      {
         _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } },
         totalOrderValue: { $sum: { $multiply: [ "$price", "$quantity" ] } },
         averageOrderQuantity: { $avg: "$quantity" }
      }
   },

   // Stage 3: Sort documents by totalOrderValue in descending order
   {
      $sort: { totalOrderValue: -1 }
   }

 ] )

Example output:

[
   { _id: '2022-01-12', totalOrderValue: 790, averageOrderQuantity: 30 },
   { _id: '2021-03-13', totalOrderValue: 770, averageOrderQuantity: 15 },
   { _id: '2021-03-17', totalOrderValue: 630, averageOrderQuantity: 30 },
   { _id: '2021-01-13', totalOrderValue: 350, averageOrderQuantity: 10 }
]

References
https://www.mongodb.com/docs/manual/aggregation/
https://www.mongodb.com/docs/manual/core/aggregation-pipeline/

Selecting MongoDB Fields Using Projection Queries

Queries in MongoDB return all fields in matching documents. To limit the amount of data that MongoDB sends to applications, you can include a projection document to specify or restrict fields to return.

db.inventory.insertMany( [
  { item: "journal", status: "A", size: { h: 14, w: 21, uom: "cm" }, instock: [ { warehouse: "A", qty: 5 } ] },
  { item: "notebook", status: "A",  size: { h: 8.5, w: 11, uom: "in" }, instock: [ { warehouse: "C", qty: 5 } ] },
  { item: "paper", status: "D", size: { h: 8.5, w: 11, uom: "in" }, instock: [ { warehouse: "A", qty: 60 } ] },
  { item: "planner", status: "D", size: { h: 22.85, w: 30, uom: "cm" }, instock: [ { warehouse: "A", qty: 40 } ] },
  { item: "postcard", status: "A", size: { h: 10, w: 15.25, uom: "cm" }, instock: [ { warehouse: "B", qty: 15 }, { warehouse: "C", qty: 35 } ] }
]);

Return All Fields in Matching Documents ( Without Projection )

db.inventory.find( { status: "A" } )

Return the Specified Fields and the _id Field Only

db.inventory.find( { status: "A" }, { item: 1, status: 1 } )

Suppress _id Field

db.inventory.find( { status: "A" }, { item: 1, status: 1, _id: 0 } )

Return All But the Excluded Fields

db.inventory.find( { status: "A" }, { status: 0, instock: 0 } )

Return Specific Fields in Embedded Documents

db.inventory.find(
   { status: "A" },
   { item: 1, status: 1, "size.uom": 1 }
)

Suppress Specific Fields in Embedded Documents

db.inventory.find(
   { status: "A" },
   { "size.uom": 0 }
)

References
https://www.mongodb.com/docs/manual/tutorial/project-fields-from-query-results/

Log All Queries in MongoDB using Database Profiler

Enable logging queries until restart

mongo -u username -p password
use myDb
db.setProfilingLevel(2)

Profiling Status

The current profile level, slowOpThresholdMs setting, and slowOpSampleRate setting.

> db.getProfilingStatus()
{ "was" : 2, "slowms" : 1, "sampleRate" : 1 }

Starting in MongoDB 4.4.2, you can set a filter to control which operations are logged by the profiler.

Enable logging queries permanently

nano /etc/mongod.conf

Log queries take more than 1 mills  to run :

operationProfiling:
   mode: all
   slowOpThresholdMs: 1
   slowOpSampleRate: 1.0

Log queries with filter :

operationProfiling:
   mode: all
   filter: '{ op: "query", millis: { $gt: 2000 } }'

Change Size of system.profile Collection on the Primary

The <database>.system.profile collection stores database profiling information.

To change the size of the system.profile collection on the primary, you must:

  • Disable profiling.
  • Drop the system.profile collection.
  • Create a new system.profile collection.
  • Re-enable profiling.

For example, to create a new system.profile collection that is 4000000 bytes (4 MB), use the following sequence of operations in mongosh:

db.setProfilingLevel(0)

db.system.profile.drop()

db.createCollection( "system.profile", { capped: true, size:4000000 } )

db.setProfilingLevel(1)

References
https://stackoverflow.com/questions/15204341/mongodb-logging-all-queries
https://stackoverflow.com/questions/18510966/why-the-queries-inside-the-system-profile-collection-are-being-overwritten
https://www.mongodb.com/docs/manual/tutorial/manage-the-database-profiler/
https://www.mongodb.com/docs/manual/reference/configuration-options/

SimpleTrigger vs CronTrigger in Quartz Scheduler

SimpleTrigger
SimpleTrigger is used for scenarios in which we need to execute a job at a specific moment in time.

SimpleTrigger trigger = (SimpleTrigger) TriggerBuilder.newTrigger()
  .withIdentity("trigger1", "group1")
  .startAt(myStartTime)
  .forJob("job1", "group1")
  .build();

CronTrigger
The CronTrigger is used when we need schedules based on calendar-like statements.

CronTrigger trigger = TriggerBuilder.newTrigger()
  .withIdentity("trigger3", "group1")
  .withSchedule(CronScheduleBuilder.cronSchedule("0 0/2 8-17 * * ?"))
  .forJob("myJob", "group1")
  .build();

References
https://www.baeldung.com/quartz
https://stackoverflow.com/questions/26403391/difference-between-cron-trigger-and-simple-trigger-in-quartz-scheduler
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html
https://www.freeformatter.com/cron-expression-generator-quartz.html