Skip to content
This repository has been archived by the owner on Jan 15, 2021. It is now read-only.

Commit

Permalink
Checkout service produces Kafka records for orders
Browse files Browse the repository at this point in the history
  • Loading branch information
chlowell committed May 22, 2017
1 parent ac07665 commit 16d9dd0
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 26 deletions.
3 changes: 2 additions & 1 deletion checkoutService/CheckoutService.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<RuntimeFrameworkVersion>1.1.1</RuntimeFrameworkVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="1.1.2"/>
<PackageReference Include="Microsoft.AspNetCore.Routing" Version="1.1.1"/>
<PackageReference Include="Microsoft.AspNetCore.Server.IISIntegration" Version="1.1.1"/>
Expand All @@ -23,4 +24,4 @@
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="1.1.1"/>
<PackageReference Include="MongoDB.Driver" Version="2.4.3"/>
</ItemGroup>
</Project>
</Project>
31 changes: 24 additions & 7 deletions checkoutService/Controllers/OrderController.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
using CheckoutService.Models;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Primitives;
using MongoDB.Driver;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using MongoDB.Driver;
using CheckoutService.Models;
using Microsoft.Extensions.Primitives;

namespace CheckoutService.Controllers
{
[Route("api/[controller]")]
public class OrderController : Controller
{
IMongoDbService _database;
Producer<Null, string> _producer;
string _topic;

public OrderController(IMongoDbService database)
public OrderController(IMongoDbService database, IOptions<KafkaSettings> kafkaSettings)
{
_database = database;

var config = new Dictionary<string, object> {
{ "bootstrap.servers", kafkaSettings.Value.Broker }
};
_producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8));
_topic = kafkaSettings.Value.Topic;
}

[HttpGet]
Expand Down Expand Up @@ -47,7 +59,7 @@ public async Task<IActionResult> Create([FromBody] Order order)
{
return BadRequest($"Invalid order values. Order:{order} UserId:{userId}");
}

//When an Order is created, the service assumes that any Items that should be added to the order will be sent
//in the same POST request. If no items are specified, an empty list of Items will be created by default.
if (order.Items == null)
Expand All @@ -57,6 +69,11 @@ public async Task<IActionResult> Create([FromBody] Order order)

order.UserId = userId;
await _database.GetOrderCollection().InsertOneAsync(order);

// produce a Kafka record so the sticker service can update popularity scores
var recordJson = JsonConvert.SerializeObject(order.Items);
var deliveryReport = await _producer.ProduceAsync(_topic, null, recordJson);

return Created(order.Id, order);
}

Expand Down
8 changes: 8 additions & 0 deletions checkoutService/KafkaSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace CheckoutService
{
public class KafkaSettings
{
public string Broker { get; set; }
public string Topic { get; set; }
}
}
3 changes: 3 additions & 0 deletions checkoutService/Models/OrderItem.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
using System.ComponentModel.DataAnnotations;
using MongoDB.Bson.Serialization.Attributes;
using Newtonsoft.Json;

namespace CheckoutService.Models
{
public class OrderItem
{
[BsonId]
[JsonProperty("id")]
[Required]
public string Id {get; set;}

[Required]
[JsonProperty("quantity")]
public int Quantity {get; set;}
}
}
11 changes: 1 addition & 10 deletions checkoutService/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
using System;
using System.IO;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
Expand All @@ -25,23 +20,19 @@ public Startup(IHostingEnvironment env)

public IConfigurationRoot Configuration { get; }

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
// Adds services required for using options.
services.AddOptions();

// Register the IConfiguration instance which MyOptions binds against.
services.Configure<DatabaseSettings>(Configuration.GetSection("Database"));
services.Configure<KafkaSettings>(Configuration.GetSection("Kafka"));

// Add service for interacting with Mongo DB
services.AddSingleton<IMongoDbService, MongoDbService>();

// Add framework services.
services.AddMvc();
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole(Configuration.GetSection("Logging"));
Expand Down
4 changes: 4 additions & 0 deletions checkoutService/appsettings.dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@
"Host": "mongo",
"OrderCollectionName": "orders",
"FeedbackCollectionName": "feedbackEntries"
},
"Kafka": {
"Broker": "kafka:9092",
"Topic": "TrendingStickers"
}
}
2 changes: 1 addition & 1 deletion sessionService/routes/history.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ router.put('/:item_id', async (req, res) => {
// send a Kafka message to inform the sticker service of the view event
const payload = [{
topic: process.env.KAFKA_TOPIC,
messages: JSON.stringify([{ item_id: req.params.item_id }])
messages: JSON.stringify([{ id: req.params.item_id }])
}];
kafkaProducer.send(payload, (err, result) => {
if (err) {
Expand Down
8 changes: 4 additions & 4 deletions stickerService/itemPopularity.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ function increaseItemScoreAsync(itemId, increaseAmount) {
};

// we expect msg.value is JSON for this:
// [ { "item_id": string, "quantity": number? }, ... ]
// [ { "id": string, "quantity": number? }, ... ]
// where "quantity" is undefined for view events
const messageMalformed = offset => `message at offset ${offset} is malformed`;
function validateMessage(message) {
Expand All @@ -119,11 +119,11 @@ function validateMessage(message) {

let events = [];
for (const element of parsedMessage) {
if (!element.item_id) {
if (!element.id) {
return reject(messageMalformed(message.offset));
}

let event = { item_id: element.item_id };
let event = { id: element.id };
if (element.quantity) {
const quantity = parseInt(element.quantity);
if (!Number.isInteger(quantity)) {
Expand Down Expand Up @@ -161,7 +161,7 @@ async function handleMessage(msg) {
// arbitrary multiplier: 1 order is worth 3 views
// (item.quantity is undefined for view events)
const scoreIncrement = item.quantity ? item.quantity * 3 : 1;
return increaseItemScoreAsync(item.item_id, scoreIncrement);
return increaseItemScoreAsync(item.id, scoreIncrement);
}));
console.log(scoreMessages.join('\n'));

Expand Down
6 changes: 3 additions & 3 deletions stickerService/routes/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ router.post('/checkout', (req, res) => {

const orderedStickers = Object.keys(checkoutItems).map(item => {
return {
item_id: checkoutItems[item].id,
id: checkoutItems[item].id,
quantity: checkoutItems[item].quantity
}
};
});

const payloads = [{
Expand All @@ -39,7 +39,7 @@ router.post('/checkout', (req, res) => {

// emit a Kafka message for a view event
router.get('/view/:item_id', (req, res) => {
const message = JSON.stringify([{ item_id: req.params.item_id }]);
const message = JSON.stringify([{ id: req.params.item_id }]);
const payload = [{
topic: process.env.KAFKA_TOPIC,
messages: message
Expand Down

0 comments on commit 16d9dd0

Please sign in to comment.