通过 eShopOnContainers 项目学习一下微服务

news2024/12/24 13:26:50

这里是项目地址 https://github.com/dotnet-architecture/eShopOnContainers, 这是微软创建的一个基于 .NET 平台的微服务架构的示例应用程序,里面基本上市面上主流的时髦的技术都用上了。
因为涉及的内容比较多,所以我们只简单查看一下微服务的代码实现和 DockerFile 的编写,至于K8s,网关,鉴权等,我们不查看。

首先查看项目结构

我们主要查看 Service 文件夹里面微服务的代码实现。具体来说也就是 Basket 购物车,catalog 商品目录,Ordering 订单微服务的实现。

查看 Basket.API 项目, Program.cs 和 Startup.cs 中的启动和配置我们直接跳过,直接查看 Controllers,下面是 BasketController 中 CheckoutAsync 方法的代码

[Route("checkout")]
[HttpPost]
[ProducesResponseType((int)HttpStatusCode.Accepted)]
[ProducesResponseType((int)HttpStatusCode.BadRequest)]
public async Task<ActionResult> CheckoutAsync([FromBody] BasketCheckout basketCheckout, [FromHeader(Name = "x-requestid")] string requestId)
{
    var userId = _identityService.GetUserIdentity();

    basketCheckout.RequestId = (Guid.TryParse(requestId, out Guid guid) && guid != Guid.Empty) ?
        guid : basketCheckout.RequestId;

    var basket = await _repository.GetBasketAsync(userId);

    if (basket == null)
    {
        return BadRequest();
    }

    var userName = this.HttpContext.User.FindFirst(x => x.Type == ClaimTypes.Name).Value;

    var eventMessage = new UserCheckoutAcceptedIntegrationEvent(userId, userName, basketCheckout.City, basketCheckout.Street,
        basketCheckout.State, basketCheckout.Country, basketCheckout.ZipCode, basketCheckout.CardNumber, basketCheckout.CardHolderName,
        basketCheckout.CardExpiration, basketCheckout.CardSecurityNumber, basketCheckout.CardTypeId, basketCheckout.Buyer, basketCheckout.RequestId, basket);

    // Once basket is checkout, sends an integration event to
    // ordering.api to convert basket to order and proceeds with
    // order creation process
    try
    {
        _eventBus.Publish(eventMessage);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "ERROR Publishing integration event: {IntegrationEventId} from {AppName}", eventMessage.Id, Program.AppName);

        throw;
    }

    return Accepted();
}

该方法发送了一个集成事件,搜索 EventBus 的实现,我们发现有两个

ServiceBus 是微软自己云服务的事件总线,我们查看 RabbitMQ 的实现,查看 Publish 方法

public void Publish(IntegrationEvent @event)
{
    if (!_persistentConnection.IsConnected)
    {
        _persistentConnection.TryConnect();
    }

    var policy = RetryPolicy.Handle<BrokerUnreachableException>()
        .Or<SocketException>()
        .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
        {
            _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);
        });

    var eventName = @event.GetType().Name;

    _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);

    using var channel = _persistentConnection.CreateModel();
    _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);

    channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

    var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions
    {
        WriteIndented = true
    });

    policy.Execute(() =>
    {
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2; // persistent

            _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);

        channel.BasicPublish(
            exchange: BROKER_NAME,
            routingKey: eventName,
            mandatory: true,
            basicProperties: properties,
            body: body);
    });
}

可以看到他利用 RetryPolicy 类创建了一个重试策略,当实际的发送动作抛出了 SocketException 的时候,进行重试。 RetryPolicy 类是 Polly 库提供的一个类,Poll 的仓库地址为 https://github.com/App-vNext/Polly。
这里指定发生 SocketException 以后重试的原因是为了方式应用程序已经启动而 RabbitMQ 还没有启动完成。然后其他的就没什么了,因为是简单的 CRUD 操作,所以使用的是贫血模型,并且使用 Redis 作为数据存储 DB。

然后再查看 Catalog.API 下面的 CatalogController 文件,我们注意到其没有注入任何域模型的 Repository,而是直接注入了 CatalogContext 这个 DbContext 的派生类(实际我也发现大部分时候 Repository 实际上没有什么用,还不如直接使用 DbContext,或许使用 Repository 的好处就是方便单元测试,毕竟 mock 一个 Repository 要比 mock 整个 DbContext 要简单)。

我们看到 UpdateProductAsync 方法

public async Task<ActionResult> UpdateProductAsync([FromBody] CatalogItem productToUpdate)
{
    var catalogItem = await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id == productToUpdate.Id);

    if (catalogItem == null)
    {
        return NotFound(new { Message = $"Item with id {productToUpdate.Id} not found." });
    }

    var oldPrice = catalogItem.Price;
    var raiseProductPriceChangedEvent = oldPrice != productToUpdate.Price;

    // Update current product
    catalogItem = productToUpdate;
    _catalogContext.CatalogItems.Update(catalogItem);

    if (raiseProductPriceChangedEvent) // Save product's data and publish integration event through the Event Bus if price has changed
    {
        //Create Integration Event to be published through the Event Bus
        var priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id, productToUpdate.Price, oldPrice);

        // Achieving atomicity between original Catalog database operation and the IntegrationEventLog thanks to a local transaction
        await _catalogIntegrationEventService.SaveEventAndCatalogContextChangesAsync(priceChangedEvent);

        // Publish through the Event Bus and mark the saved event as published
        await _catalogIntegrationEventService.PublishThroughEventBusAsync(priceChangedEvent);
    }
    else // Just save the updated product because the Product's Price hasn't changed.
    {
        await _catalogContext.SaveChangesAsync();
    }

    return CreatedAtAction(nameof(ItemByIdAsync), new { id = productToUpdate.Id }, null);
}

可以看到当产品价格发生了更改以后其首先保存了修改价格的实现和 DbContext 追踪的更改,然后发布了 ProductPriceChangedIntegrationEvent 这个集成事件,具体的我们查看
这两个方法的实现。

public async Task SaveEventAndCatalogContextChangesAsync(IntegrationEvent evt)
{
    _logger.LogInformation("----- CatalogIntegrationEventService - Saving changes and integrationEvent: {IntegrationEventId}", evt.Id);

    //Use of an EF Core resiliency strategy when using multiple DbContexts within an explicit BeginTransaction():
    //See: https://docs.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency            
    await ResilientTransaction.New(_catalogContext).ExecuteAsync(async () =>
    {
        // Achieving atomicity between original catalog database operation and the IntegrationEventLog thanks to a local transaction
        await _catalogContext.SaveChangesAsync();
        await _eventLogService.SaveEventAsync(evt, _catalogContext.Database.CurrentTransaction);
    });
}

//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

public async Task PublishThroughEventBusAsync(IntegrationEvent evt)
{
    try
    {
        _logger.LogInformation("----- Publishing integration event: {IntegrationEventId_published} from {AppName} - ({@IntegrationEvent})", evt.Id, Program.AppName, evt);

        await _eventLogService.MarkEventAsInProgressAsync(evt.Id);
        _eventBus.Publish(evt);
        await _eventLogService.MarkEventAsPublishedAsync(evt.Id);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "ERROR Publishing integration event: {IntegrationEventId} from {AppName} - ({@IntegrationEvent})", evt.Id, Program.AppName, evt);
        await _eventLogService.MarkEventAsFailedAsync(evt.Id);
    }
}

可以看到 SaveEventAndCatalogContextChangesAsync 方法首先保存了 CatalogContext 追踪的所有更改,然后保存了集成事件事件到数据库,而此时集成事件实际上还没有被发布。

可以看到 PublishThroughEventBusAsync 实际发送了事件,并且标记了事件的状态,这是为了最终一致性而做的工作,也许还需要一个后台任务自动重发处理失败的事件。

然后看到 Ordering.API,我们首先看看其对集成事件是如何处理的。看到 Startup 中的 ConfigureEventBus 方法

private void ConfigureEventBus(IApplicationBuilder app)
{
    var eventBus = app.ApplicationServices.GetRequiredService<BuildingBlocks.EventBus.Abstractions.IEventBus>();

    eventBus.Subscribe<UserCheckoutAcceptedIntegrationEvent, IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>>();
    eventBus.Subscribe<GracePeriodConfirmedIntegrationEvent, IIntegrationEventHandler<GracePeriodConfirmedIntegrationEvent>>();
    eventBus.Subscribe<OrderStockConfirmedIntegrationEvent, IIntegrationEventHandler<OrderStockConfirmedIntegrationEvent>>();
    eventBus.Subscribe<OrderStockRejectedIntegrationEvent, IIntegrationEventHandler<OrderStockRejectedIntegrationEvent>>();
    eventBus.Subscribe<OrderPaymentFailedIntegrationEvent, IIntegrationEventHandler<OrderPaymentFailedIntegrationEvent>>();
    eventBus.Subscribe<OrderPaymentSucceededIntegrationEvent, IIntegrationEventHandler<OrderPaymentSucceededIntegrationEvent>>();
}

实际上主要是 IntegrationEvent 和 IIntegrationEventHandler 接口,通过 Subscribe 方法注册,我们仍然查看 EventBusRabbitMQ(EventBus 的其中一个实现)。

当调用 Subscribe 方法时候,判断事件处理器是否已经存在,如果不存在将其添加到 IEventBusSubscriptionsManager,然后注册消息处理的回调。回调函数主要的中一个主要的函数就是 ProcessEvent 函数,我们可以简单看看

private async Task ProcessEvent(string eventName, string message)
{
    _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);

    if (_subsManager.HasSubscriptionsForEvent(eventName))
    {
        await using var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME);
        var subscriptions = _subsManager.GetHandlersForEvent(eventName);
        foreach (var subscription in subscriptions)
        {
            if (subscription.IsDynamic)
            {
                if (scope.ResolveOptional(subscription.HandlerType) is not IDynamicIntegrationEventHandler handler) continue;
                using dynamic eventData = JsonDocument.Parse(message);
                await Task.Yield();
                await handler.Handle(eventData);
            }
            else
            {
                var handler = scope.ResolveOptional(subscription.HandlerType);
                if (handler == null) continue;
                var eventType = _subsManager.GetEventTypeByName(eventName);
                var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true });
                var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);

                await Task.Yield();
                await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
            }
        }
    }
    else
    {
        _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);
    }
}

我们直接看不是 IDynamicIntegrationEventHandler 的情况,也就是 else 的那一部分。可以看到实际上这部分很简单,也就是将对应的 EventHandler 从 IOC 容器中取出,然后直接调用其的 Handle 方法
就可以了。不过 Task.Yield() 调用在这里是干什么呢?这里是让出当前线程,然后让后面的那一部分,也就是 Invoke Handle 方法的工作重新排队。

TODO MediaR 的使用
TODO 实现 CQRS
TODO Docker Docker-compose

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/183243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信息抽取命名实体识别和关系抽取)

信息抽取的定义为&#xff1a;从自然语言文本中抽取指定类型的实体&#xff0c;关系、事件等事实信息。并形成结构化数据输出的文本处理技术。 信息抽取是从文本数据中抽取特定信息的一种技术&#xff0c;文本数据由医学具体的单位构成&#xff0c;例如&#xff0c;句子、段落、…

JavaWeb—Vue的简单介绍

1 Vue介绍 概述 Vue是一套构建用户界面的渐进式前端框架。只关注视图层&#xff0c;并且非常容易学习&#xff0c;还可以很方便的与其它库或已有项目整合。通过尽可能简单的API来实现响应数据的绑定和组合的视图组件。 数据渲染 数据库 --JDBC–> java程序 --http协议–>…

《MySQL实战45讲》——学习笔记23 “binlogredolog 的写入机制/组提交机制“

本篇主要介绍数据的可靠性有关的知识&#xff0c;包括binlog的写入机制和redolog的写入机制&#xff0c;通过了解这些机制从而可以在MySQL的IO性能瓶颈上做些优化&#xff1b;前文介绍了MySQL在可靠性、性能相关的概念&#xff0c;包括WAL技术、redolog与binlog、2阶段提交、ch…

阿里云图标使用 (symbol 引用方式)

阿里云图标网址: https://www.iconfont.cn/ 一、登录注册 这个简单&#xff0c;就不说了 二、给当前项目找图库 2.1、添加项目 2.2、寻找图标添加入库 添加入库 2.3、打开入库 的图标添加到指定项目 添加到当前项目 1 2 三、项目使用图标 ( symbol 引用方式) 3.1、下…

《HelloGitHub》第 82 期

兴趣是最好的老师&#xff0c;HelloGitHub 让你对编程感兴趣&#xff01;简介HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。https://github.com/521xueweihan/HelloGitHub这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等&#xff0c;涵盖多种编程语言 Pyth…

Studio One6有哪些新功能及系统配置要求介绍

Studio One6全新版本上线记录、生产、混合、掌握和执行所有操作。从工作室到舞台&#xff0c;Studio One6以易用为核心&#xff0c;是您的创意合作伙伴。当你准备好登上舞台时&#xff0c;Studio One就在那里。只有Studio One从最初的灵感到完整的制作&#xff0c;最终混音到精…

一个数码管显示0-F

数码管的一种是半导体发光器件&#xff0c;数码管可分为七段数码管和八段数码管&#xff0c;区别在于八段数码管比七段数码管多一个用于显示小数点的发光二极管单元DP&#xff08;decimal point&#xff09;&#xff0c;其基本单元是发光二极管。七段数码管是一类价格便宜使用简…

VuePress 搭建结合GitHub Pages CI

简介 VuePress 是尤雨溪&#xff08;vue.js 框架作者&#xff09;4月12日发布的一个全新的基于 vue 的静态网站生成器&#xff0c;实际上就是一个 vue 的 spa 应用&#xff0c;内置 webpack&#xff0c;可以用来写文档。详见 VuePress中文网 其实类似的建站工具有很多&#x…

Java多线程 - 创建线程池的方法 - ThreadPoolExecutor和Executors

文章目录线程池(重点)线程池介绍实现线程池的方式方式一: 实现类ThreadPoolExecutorThreadPoolExecutor构造器的参数线程池处理Runnable任务线程池处理Callable任务方式二: Executors工具类创建线程池线程池(重点) 线程池介绍 什么是线程池? 线程池就是一个可以复用线程的技…

以太网报文详解

以太网数据帧格式 以太网链路传输的数据包称做以太帧&#xff0c;或者以太网数据帧。在以太网中&#xff0c;网络访问层的软件必须把数据转换成能够通过网络适配器硬件进行传输的格式。 以太帧的工作机制 当以太网软件从网络层接收到数据报之后&#xff0c;需要完成如下操作&am…

模拟实现stack queue/dequeue/适配器/优先级队列/仿函数

⭐前言&#xff1a;学习C的STL&#xff0c;我们不仅仅要要求自己能够熟练地使用上层语法&#xff0c;我们还必须要求自己了解其底层原理&#xff0c;不需要了解得太深入&#xff0c;但一定得知道我们写出的各种代码后面&#xff0c;究竟采用了哪种设计思想&#xff0c;为什么要…

口碑巨制《流浪地球2》,再燃中国科幻电影新高度!

2019年&#xff0c;中国本土科幻电影《流浪地球》以炸裂之势吸引一众目光。上映26天&#xff0c;票房突破45亿&#xff0c;强势开启中国科幻电影的元年。如今时隔4年&#xff0c;《流浪地球2》再度登陆春节档&#xff0c;票房口碑双丰收&#xff0c;上映四天票房破13亿、淘票票…

Android渗透测试12:IDA动态调试so

0x00 前言 上一篇分享了使用 Android studio 和 Jeb 对 Apk 文件直接进行动态调试&#xff0c;本文将分享使用 IDA pro 调试 so 。 调试的 apk 文件还是使用 CTF案例4 的文件&#xff0c;已经上传到知识星球&#xff0c;可自行下载 本文涉及技术&#xff1a; IDA pro 工具使…

论文解读 - 城市自动驾驶车辆运动规划与控制技术综述 (第4部分)

文章目录&#x1f697; IV. Mothon Planning&#xff08;运动规划&#xff09;&#x1f7e2; D. Graph Search Methods&#xff08;图搜索算法&#xff09;&#x1f7e5; 1) Lane Graph&#xff08;车道图&#xff09;&#x1f7e7; 2) Geometric Methods&#xff08;几何方法&…

AtCoder Beginner Contest 287 A-G 赛时思路+正解

一把给我加到1219了&#xff0c;青大小蒟蒻表示很开心。 A - Majority 题意 问你"For""For""For"字符串数量是否比"Against""Against""Against"数量多。 思路 mapmapmap暴力即可。 A题代码 B - Postal Card 题意…

电脑技巧:教你关闭Win11内存压缩,解决电脑卡顿的问题

很多朋友都注意到&#xff0c;Win11默认开启了内存压缩功能。内存压缩顾名思义&#xff0c;可以压缩内存中的数据&#xff0c;让内存占用更少&#xff0c;同时减少Swap频次&#xff0c;带来更高的I/O效率。 但与此同时&#xff0c;压缩数据需要耗费CPU资源&#xff0c;一些朋友…

Dr4g0n-b4ll靶机总结

Dr4g0n-b4ll靶机渗透测试总结 靶机下载地址: https://download.vulnhub.com/dr4g0nb4ll/Dr4g0n-b4ll.zip 打开靶机,使用nmap扫描靶机的ip和所有开放的端口 可以看到靶机开放了80端口和22端口 根据80端口打开网站 信息收集,目录爆破 在robots.txt下发现一串base64编码 eW91IG…

编写循环(RH294)

循环这东西你早就懂的不是么就像python里的for一样在ansible中 使用loop关键字来实现迭代简单循环简单循环中一般使用loop关键字来开始循环使用循环变量item来存储每个迭代过程中使用的值举个例子 栗子啊首先让我们拿出两个任务片段- name: Postfix is runningservice:name: po…

索引15连问

前言 大家好&#xff0c;我是田螺。 金三银四很快就要来啦&#xff0c;准备了索引的15连问&#xff0c;相信大家看完肯定会有帮助的。 公众号&#xff1a;捡田螺的小男孩 1. 索引是什么&#xff1f; 索引是一种能提高数据库查询效率的数据结构。它可以比作一本字典的目录&am…

从C语言的使用转换到C++(下篇)——刷题、竞赛篇

目录 一、CSTL的简介 二、STL的使用详解 2、1 STL之动态数组vector的使用 2、2 STL之集合set的使用 2、3 STL之映射map的使用 2、4 STL之栈stack的使用 2、5 STL之队列queue的使用 2、6 STL之unordered_map和unordered_set的使用 三、总结 标题&#xff1a;从C语言的使用转换…