Wikimedia To Opensearch

news2024/11/26 0:33:04

概览

  • Wikimedia ⇒ Kafka ⇒ Opensearch
  • Java Library:OKhttp3和OkHttp EventSource;
  • 生产者:Wikimedia:WikimediaChangeHandler和WikimediaChangeProducer;
  • 消费者:Opensearch:OpenSearchConsumer,opensearch-java + httpclient5;
  • https://stream.wikimedia.org/v2/stream/recentchange
  • https://esjewett.github.io/wm-eventsource-demo
  • https://codepen.io/Krinkle/pen/BwEKgW?editors=1010
  • Rest Api使用OpenSearch Dashboard,在线可使用Bonsai.io;

Kafka环境



version: '3.8'
services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    privileged: true
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LOG_DIRS: '/tmp/kafka-log'
      CLUSTER_ID: 'YWU3MzE1YmVmYzhiMTFlZT'

      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'

      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'

      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
  default:
    name: network-common
    external: true
  • vi /opt/kafka/config/kraft/server.properties
#controller.quorum.voters=1@localhost:9093
controller.quorum.voters=1@192.168.0.123:9093

#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092

#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT_HOST://192.168.0.123:9092,PLAINTEXT://kafka:19092

Opensearch

Open Search Prerequisite

# disable memory paging and swapping performance
sudo swapoff -a

# edit sysctl config
sudo vi /etc/sysctl.conf

# add line to define desired value or change exist
vm.max_map_count=262144

# reload kernel parameter using sysctl
sudo sysctl -p

# verify change
cat /proc/sys/vm/max_map_count

Open Search Compose

 docker pull opensearchproject/opensearch:1.3.16 && \
        docker pull opensearchproject/opensearch-dashboards:1.3.16


version: '3.8'
services:
  opensearch:
    image: opensearchproject/opensearch:1.3.16
    container_name: opensearch
    environment:
      discovery.type: single-node
      plugins.security.disabled: true
      compatibility.override_main_response_version: true
    ports:
      - "9200:9200"
      - "9600:9600"
  opensearch-dashboard:
    image: opensearchproject/opensearch-dashboards:1.3.16
    container_name: opensearch-dashboard
    ports:
      - "5601:5601"
    environment:
      OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
      DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
  • http://192.168.0.123:5601
  • https://192.168.0.123:9200

Producer

Producer Dependency

<properties>
        <okhttp.eventsource>2.7.1</okhttp.eventsource>
</properties>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
</dependency>

<dependency>
    <groupId>com.launchdarkly</groupId>
    <artifactId>okhttp-eventsource</artifactId>
    <version>${okhttp.eventsource}</version>
</dependency>

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

WikimediaChangeHandler

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import java.lang.invoke.MethodHandles;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WikimediaChangeHandler implements EventHandler {

        private static final Logger logger = LoggerFactory
                .getLogger(MethodHandles.lookup().lookupClass());

        String topic;
        KafkaProducer<String, String> kafkaProducer;

        public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer,String topic) {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    @Override
    public void onOpen() {}

    @Override
    public void onClosed() {
        kafkaProducer.close();
    }

    @Override
    public void onMessage(String event, MessageEvent messageEvent) {
            logger.error(messageEvent.getData());
        kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
    }

    @Override
    public void onComment(String comment) {}

    @Override
    public void onError(Throwable t) {
            logger.error("Stream Reading Failure!", t);
    }
}

WikimediaChangeProducer

import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.EventHandler;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.net.URI;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class WikimediaChangeProducer {

        public static void main(String[] args) throws InterruptedException {
                String bootstrapServers = "192.168.0.123:9092";

            // create Producer properties
            Properties properties = new Properties();
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

            String topic = "wikimedia.recentchange";

            EventHandler eventHandler = new WikimediaChangeHandler(kafkaProducer, topic);
            String url = "https://stream.wikimedia.org/v2/stream/recentchange";
            EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));

            builder.connectTimeout(Duration.ofMinutes(10));
            //注:需科学上网
            builder.proxy("127.0.0.1",1080);
            EventSource eventSource = builder.build();

            // start the producer in another thread
            eventSource.start();

            // we produce for 10 minutes and block the program until then
            TimeUnit.MINUTES.sleep(10);
          }

}

Consumer

Consumer Dependency

<properties>
        <opensearch.java>2.10.1</opensearch.java>
</properties>

<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-java</artifactId>
    <version>${opensearch.java}</version>
</dependency>

<dependency>
    <groupId>org.apache.httpcomponents.client5</groupId>
    <artifactId>httpclient5</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

OpenSearchConsumer

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.IndexResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenSearchConsumer {

        private static final Logger logger = LoggerFactory
                .getLogger(MethodHandles.lookup().lookupClass());

        public static OpenSearchClient connect(String scheme,String hostName,int port) {
                final HttpHost host = new HttpHost(scheme,hostName,port);

            final ApacheHttpClient5TransportBuilder builder =
                    ApacheHttpClient5TransportBuilder.builder(host);

            builder.setHttpClientConfigCallback(hcb -> {
                    final PoolingAsyncClientConnectionManager manager =
                            PoolingAsyncClientConnectionManagerBuilder.create().build();
                    return hcb.setConnectionManager(manager);
            });

            final OpenSearchTransport transport = builder.build();
            return new OpenSearchClient(transport);
        }

        public static OpenSearchClient connect() {
                return connect("http","192.168.0.123",9200);
        }

        public static boolean exist(OpenSearchClient client,String indexName)
                throws OpenSearchException, IOException {
                var existRequest = ExistsRequest.of(fn -> fn.index(indexName));
                BooleanResponse exist = client.indices().exists(existRequest);
                return exist.value();
        }

        public static void createIndex(OpenSearchClient client,
                String indexName) throws OpenSearchException, IOException {
                var exist = exist(client,indexName);
                if (exist) {
                        System.out.printf("index %s already exist!\n",indexName);
                } else {
                        var createRequest = new CreateIndexRequest.Builder().index(indexName).build();
                        client.indices().create(createRequest);
                }
        }

        //GET /_cat/indices?v
        public static void deleteIndex(OpenSearchClient client,
                String indexName) throws OpenSearchException, IOException {
                var exist = exist(client,indexName);
                if (!exist) {
                        System.out.printf("index %s not exist!\n",indexName);
                } else {
                        var deleteRequest = new DeleteIndexRequest.Builder().index(indexName).build();
                        client.indices().delete(deleteRequest);
                }
        }

        public static KafkaConsumer<String,String> createKafkaConsumer(){
                var boostrapServer = "192.168.0.123:9092";
                var groupId = "group-wikimedia-opensearch";
                Properties prop = new Properties();
                prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,boostrapServer);
                prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
                prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
                prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
                //earliest,latest etc.
                prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
                return new KafkaConsumer<>(prop);
        }

        public static void consume(OpenSearchClient client,String indexName)
                throws OpenSearchException, IOException {
                var consumer = createKafkaConsumer();
                var topic = "wikimedia.recentchange";
                consumer.subscribe(Arrays.asList(topic));

                while (true) {
                        var consumerRecord = consumer.poll(Duration.ofMillis(3000));
                        int recordCount = consumerRecord.count();
                        logger.info("receive %d record!",recordCount);

                        for (ConsumerRecord<String,String> cr : consumerRecord) {
                                //send record into OpenSearch
                                IndexData indexData = new IndexData(cr.value());
                                var indexRequest = new IndexRequest.Builder<IndexData>()
                                        .index(indexName).document(indexData).build();
                                IndexResponse response = client.index(indexRequest);
                                System.out.println(response.id());
                        }
                }
        }

        public static void main(String[] sa) throws OpenSearchException, IOException {
                var client = connect();
                var indexName = "wikimedia-opensearch";
                //createIndex(client, indexName);

                System.out.println("consuming start...");
                consume(client,indexName);
                System.out.println("consuming end...");
        }


        static class IndexData {
                private String wikiMediaValue;

                public IndexData(String wikiMediaValue) {
                        this.wikiMediaValue = wikiMediaValue;
                }

                @Override
                public String toString() {
                        return String.format("IndexData{wikiMediaValue='%s'}",wikiMediaValue);
                }

                public String getWikiMediaValue() {
                        return wikiMediaValue;
                }
                public void setWikiMediaValue(String wikiMediaValue) {
                        this.wikiMediaValue = wikiMediaValue;
                }
        }

}

Testing

Create Topic

  • 创建主题,并启动WikimediaChangeProducer;
  • 注:需科学上网,需科学上网,需科学上网;
./kafka-topics.sh \
        --bootstrap-server 192.168.0.123:9092  \
        --topic wikimedia.recentchange --create \
        --partitions 3 --replication-factor 1
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"93bf8bf8-4c36-4a49-b226-3a9311d2c906","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257037},"id":484017699,"type":"log","namespace":2,"title":"Участник:Sherbek Qarshiyev","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:Sherbek_Qarshiyev","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062090,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Sherbek Qarshiyev (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q46898283","request_id":"161daa3c-9f11-44f1-b042-209a3acabcf8","id":"65fb4ae3-86fd-47ab-a85f-f0dd26b04b66","dt":"2024-04-25T12:52:32Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257038},"id":2201214347,"type":"edit","namespace":0,"title":"Q46898283","title_url":"https://www.wikidata.org/wiki/Q46898283","comment":"/* wbsetclaimvalue:1| */ [[Property:P1476]]: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults","timestamp":1714049552,"user":"KrBot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2136975570&oldid=2136975561&rcid=2201214347","minor":false,"patrolled":true,"length":{"old":60872,"new":60869},"revision":{"old":2136975561,"new":2136975570},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Определено значение для утверждения: </span></span> <a href=\"/wiki/Property:P1476\" title=\"название | название произведения (книги, фильма, газетной статьи, произведения исполнительского искусства, веб-сайта)\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"ru\" dir=\"ltr\">название</span> <span class=\"wb-itemlink-id\">(P1476)</span></span></a>: Guidelines for diagnosis and therapy of patients with asthma 2005. The most important aspects for adults"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","request_id":"4c136271-a6cd-4ff6-a6b7-429d769ba5ba","id":"14fcefb7-1f73-40a9-9acc-539d97aa06c3","dt":"2024-04-25T12:52:32Z","domain":"en.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257039},"id":1769512861,"type":"edit","namespace":2,"title":"User:Ali Ahwazi/sandbox2","title_url":"https://en.wikipedia.org/wiki/User:Ali_Ahwazi/sandbox2","comment":"","timestamp":1714049552,"user":"Ali Ahwazi","bot":false,"notify_url":"https://en.wikipedia.org/w/index.php?diff=1220709553&oldid=1220709505","minor":false,"length":{"old":26671,"new":31548},"revision":{"old":1220709505,"new":1220709553},"server_url":"https://en.wikipedia.org","server_name":"en.wikipedia.org","server_script_path":"/w","wiki":"enwiki","parsedcomment":""}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","request_id":"be816a4f-be27-4c49-830a-31161665401f","id":"ec47eca3-b6a4-4b4f-ac71-b44369de940d","dt":"2024-04-25T12:52:33Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257040},"id":519472837,"type":"edit","namespace":100,"title":"Portail:Châteaux/Articles récents","title_url":"https://fr.wikipedia.org/wiki/Portail:Ch%C3%A2teaux/Articles_r%C3%A9cents","comment":"+ [[Château de Mielmont]]","timestamp":1714049553,"user":"OrlodrimBot","bot":true,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561930&oldid=214559720&rcid=519472837","minor":false,"patrolled":true,"length":{"old":779,"new":779},"revision":{"old":214559720,"new":214561930},"server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"+ <a href=\"/wiki/Ch%C3%A2teau_de_Mielmont\" title=\"Château de Mielmont\">Château de Mielmont</a>"}
20:52:31.936 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","request_id":"68d8c10a-31dc-44f3-8b1d-cb977dfd1602","id":"b8ab3286-f69f-466a-9a00-c5c12c176001","dt":"2024-04-25T12:16:20Z","domain":"ru.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257041},"id":484017700,"type":"log","namespace":2,"title":"Участник:Виктория Никитенко","title_url":"https://ru.wikipedia.org/wiki/%D0%A3%D1%87%D0%B0%D1%81%D1%82%D0%BD%D0%B8%D0%BA:%D0%92%D0%B8%D0%BA%D1%82%D0%BE%D1%80%D0%B8%D1%8F_%D0%9D%D0%B8%D0%BA%D0%B8%D1%82%D0%B5%D0%BD%D0%BA%D0%BE","comment":"Carn removed Carn from mentorship","timestamp":1714047380,"user":"Carn","bot":true,"log_id":101062091,"log_type":"growthexperiments","log_action":"setmentor","log_params":{"previous-mentor":"Carn","new-mentor":"Birulik"},"log_action_comment":"Carn установил Birulik как наставницу для Виктория Никитенко (предыдущий наставник Carn): Carn removed Carn from mentorship","server_url":"https://ru.wikipedia.org","server_name":"ru.wikipedia.org","server_script_path":"/w","wiki":"ruwiki","parsedcomment":"Carn removed Carn from mentorship"}

20:52:31.937 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","request_id":"739f7d34-fd23-4b37-ab77-87c95663aeda","id":"34ab12f5-7256-48e5-a4f8-b40bf95316ad","dt":"2024-04-25T12:52:33Z","domain":"oc.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257042},"id":10815379,"type":"new","namespace":0,"title":"Pairac lo Chasteu","title_url":"https://oc.wikipedia.org/wiki/Pairac_lo_Chasteu","comment":"Redireccion cap a [[Pairac (lo Chasteu)]]","timestamp":1714049553,"user":"PairacLoChasteu","bot":false,"notify_url":"https://oc.wikipedia.org/w/index.php?oldid=2436522&rcid=10815379","minor":false,"patrolled":false,"length":{"new":32},"revision":{"new":2436522},"server_url":"https://oc.wikipedia.org","server_name":"oc.wikipedia.org","server_script_path":"/w","wiki":"ocwiki","parsedcomment":"Redireccion cap a <a href=\"/wiki/Pairac_(lo_Chasteu)\" title=\"Pairac (lo Chasteu)\">Pairac (lo Chasteu)</a>"}
20:52:31.938 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","request_id":"7d6530d5-10a3-4628-a101-bc2e75b9a92f","id":"2eb4865e-b4ee-407c-9e19-b871967ed9e1","dt":"2024-04-25T12:52:30Z","domain":"fr.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257043},"id":519472838,"type":"categorize","namespace":14,"title":"Catégorie:Article à référence nécessaire","title_url":"https://fr.wikipedia.org/wiki/Cat%C3%A9gorie:Article_%C3%A0_r%C3%A9f%C3%A9rence_n%C3%A9cessaire","comment":"[[:20e armée (Union soviétique)]] ajoutée à la catégorie","timestamp":1714049550,"user":"Le Petit Chat","bot":false,"notify_url":"https://fr.wikipedia.org/w/index.php?diff=214561929&oldid=209346180&rcid=519472838","server_url":"https://fr.wikipedia.org","server_name":"fr.wikipedia.org","server_script_path":"/w","wiki":"frwiki","parsedcomment":"<a href=\"/wiki/20e_arm%C3%A9e_(Union_sovi%C3%A9tique)\" title=\"20e armée (Union soviétique)\">20e armée (Union soviétique)</a> ajoutée à la catégorie"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","request_id":"3d4237d1-1994-4c65-b41a-84ee1f1a05c6","id":"d9e3f133-2e22-4022-9449-65a78ed83452","dt":"2024-04-25T12:52:31Z","domain":"commons.wikimedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257044},"id":2478318778,"type":"categorize","namespace":14,"title":"Category:Milford, Derbyshire","title_url":"https://commons.wikimedia.org/wiki/Category:Milford,_Derbyshire","comment":"[[:File:The King William pub - geograph.org.uk - 5560373.jpg]] added to category","timestamp":1714049551,"user":"WereSpielChequers","bot":false,"notify_url":"https://commons.wikimedia.org/w/index.php?diff=871189763&oldid=871189716&rcid=2478318778","server_url":"https://commons.wikimedia.org","server_name":"commons.wikimedia.org","server_script_path":"/w","wiki":"commonswiki","parsedcomment":"<a href=\"/wiki/File:The_King_William_pub_-_geograph.org.uk_-_5560373.jpg\" title=\"File:The King William pub - geograph.org.uk - 5560373.jpg\">File:The King William pub - geograph.org.uk - 5560373.jpg</a> added to category"}
20:52:31.939 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","request_id":"c408c6e9-7e92-4f96-8a99-1d518f3af5ed","id":"e94ece30-3416-41e9-860d-d3547f2248d6","dt":"2024-04-25T12:52:33Z","domain":"ko.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257045},"type":"log","namespace":0,"title":"조상환","title_url":"https://ko.wikipedia.org/wiki/%EC%A1%B0%EC%83%81%ED%99%98","comment":"","timestamp":1714049553,"user":"Cho Sang Hwan","bot":false,"log_id":0,"log_type":"abusefilter","log_action":"hit","log_params":{"action":"edit","filter":"71","actions":"tag","log":1521905},"log_action_comment":"Cho Sang Hwan님이 [[조상환]]에서 \"edit\" 동작을 하여 [[특수:편집필터/71|필터 71]]이(가) 작동하였습니다. 조치: 태그 ([[특수:편집필터기록/1521905|자세한 사항]])","server_url":"https://ko.wikipedia.org","server_name":"ko.wikipedia.org","server_script_path":"/w","wiki":"kowiki","parsedcomment":""}
20:52:31.940 [okhttp-eventsource-events-[]-0] ERROR com.elf.kafka.producer.WikimediaChangeHandler - {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","request_id":"d2dfebf5-df74-43d5-860e-be964ee93420","id":"27e197b2-cfb1-4c65-9717-09bb25438f08","dt":"2024-04-25T12:52:33Z","domain":"os.wikipedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":5049257046},"id":1911685,"type":"new","namespace":14,"title":"Категори:Хуссар Голландийы чи амардис, уыдон","title_url":"https://os.wikipedia.org/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD","comment":"Ног фарс, йæ код райдайы афтæ: «[[Категори:Нидерландты чи амардис, уыдон]] [[Категори:Хуссар Голландийы зындгонд адæм|Амард]]»","timestamp":1714049553,"user":"Taamu","bot":false,"notify_url":"https://os.wikipedia.org/w/index.php?oldid=558822&rcid=1911685","minor":false,"patrolled":true,"length":{"new":167},"revision":{"new":558822},"server_url":"https://os.wikipedia.org","server_name":"os.wikipedia.org","server_script_path":"/w","wiki":"oswiki","parsedcomment":"Ног фарс, йæ код райдайы афтæ: «<a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%9D%D0%B8%D0%B4%D0%B5%D1%80%D0%BB%D0%B0%D0%BD%D0%B4%D1%82%D1%8B_%D1%87%D0%B8_%D0%B0%D0%BC%D0%B0%D1%80%D0%B4%D0%B8%D1%81,_%D1%83%D1%8B%D0%B4%D0%BE%D0%BD\" title=\"Категори:Нидерландты чи амардис, уыдон\">Категори:Нидерландты чи амардис, уыдон</a> <a href=\"/wiki/%D0%9A%D0%B0%D1%82%D0%B5%D0%B3%D0%BE%D1%80%D0%B8:%D0%A5%D1%83%D1%81%D1%81%D0%B0%D1%80_%D0%93%D0%BE%D0%BB%D0%BB%D0%B0%D0%BD%D0%B4%D0%B8%D0%B9%D1%8B_%D0%B7%D1%8B%D0%BD%D0%B4%D0%B3%D0%BE%D0%BD%D0%B4_%D0%B0%D0%B4%C3%A6%D0%BC\" title=\"Категори:Хуссар Голландийы зындгонд адæм\">Амард»</a>"}

……

Consume Message

  • 启动OpenSearchConsumer
# 此步骤可选
./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
        --topic wikimedia.recentchange --from-beginning
GET /_cat/indices?v

GET _search
{
  "query": {
    "match_all": {}
  }
}

GET /index_name/_search
{
  "query": {
    "match_all": {}
  }
}

Outro

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1671657.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【知识碎片】2024_05_13

本文记录了两道代码题【自除数】和【除自身以外数组的乘积】&#xff08;利用了前缀积和后缀积&#xff0c;值得再看&#xff09;&#xff0c;第二部分记录了关于指针数组和逗号表达式的两道选择题。 每日代码 自除数 . - 力扣&#xff08;LeetCode&#xff09; /*** Note: T…

k8s StatefulSet

Statefulset 一个 Statefulset 创建的每个pod都有一个从零开始的顺序索引&#xff0c;这个会体现在 pod 的名称和主机名上&#xff0c;同样还会体现在 pod 对应的固定存储上。这些 pod 的名称是可预知的&#xff0c;它是由 Statefulset 的名称加该实例的顺序索引值组成的。不同…

JUC下的ThreadLocalRandom详解

ThreadLocalRandom 是Java并发包&#xff08;java.util.concurrent&#xff09;中提供的一个随机数生成器类&#xff0c;它是从Java 7开始引入的。相较于传统的Math.random()或Random类&#xff0c;ThreadLocalRandom更适用于多线程环境&#xff0c;因为它为每个线程维护了一个…

汇昌联信电商:拼多多新手怎么做店铺的免费流量会慢慢起来?

在拼多多上开店&#xff0c;新手们往往面临着如何吸引免费流量的挑战。毕竟&#xff0c;流量是店铺生存和发展的血脉&#xff0c;没有流量&#xff0c;就没有销量&#xff0c;店铺也就失去了生命力。那么&#xff0c;作为拼多多新手&#xff0c;如何做才能让店铺的免费流量慢慢…

设计模式Java实现-迭代器模式

✨这里是第七人格的博客✨小七&#xff0c;欢迎您的到来~✨ &#x1f345;系列专栏&#xff1a;设计模式&#x1f345; ✈️本篇内容: 迭代器模式✈️ &#x1f371; 本篇收录完整代码地址&#xff1a;https://gitee.com/diqirenge/design-pattern &#x1f371; 楔子 很久…

人脸识别技术在访客管理中的应用

访客办理体系&#xff0c;能够使用于政府、戎行、企业、医院、写字楼等众多场所。在办理时&#xff0c;需求对来访人员身份进行精确认证&#xff0c;才能保证来访人员的进入对被访单位不被外来风险入侵。在核实身份时&#xff0c;比较好的方法就是选用人脸辨认技能&#xff0c;…

RAG应用中的路由模式

依据的用户查询意图在 RAG 应用程序使用“路由控制模式”可以帮助我们创建更强大的 RAG 应用程序。我们通常希望用户能够访问的数据可以来自各种来源,如报告、文档、图片、数据库和第三方系统。 对于基于业务的 RAG 应用程序,我们可能还希望用户能够与其它业务系统进行交互,…

基于SpringBoot+Vue的教师个人成果管理系统

初衷 在后台收到很多私信是咨询毕业设计怎么做的&#xff1f;有没有好的毕业设计参考? 能感觉到现在的毕业生和当时的我有着同样的问题&#xff0c;但是当时的我没有被骗&#xff0c; 因为现在很多人是被骗的&#xff0c;还没有出学校还是社会经验少&#xff0c;容易相信别人…

嗨动PDF编辑器怎么进行PDF编辑?看完了解

嗨动PDF编辑器怎么进行PDF编辑&#xff1f;PDF作为一种通用的文档格式&#xff0c;被广泛应用于商务交流、学术研究、电子出版等多个领域。但当面对需要修改或调整PDF文件内容时&#xff0c;就需要一款功能强大的PDF编辑器。嗨动PDF编辑器&#xff0c;它拥有对用户友好的界面设…

对中介者模式的理解

目录 一、场景1、题目 【[来源](https://kamacoder.com/problempage.php?pid1094)】1.1 题目描述1.2 输入描述1.3 输出描述1.4 输入示例1.5 输出示例 二、不采用中介者设计模式1 代码2 问题 三、中介者设计模式1 代码2 更好的例子 四、个人思考 一、场景 设计模式不是银弹&am…

Postman基础功能-变量设置与使用

如果你因失去太阳而流泪&#xff0c;那你也将失去群星了。大家好&#xff0c;在 API 测试的广袤世界中&#xff0c;Postman 犹如一座闪耀的灯塔&#xff0c;为我们指引着前行的方向。而其中的全局变量、集合变量和环境变量&#xff0c;更是如同隐藏的宝藏&#xff0c;蕴含着巨大…

从3D模型到渲染:完整的流程指南---模大狮模型网

在当今数字化时代&#xff0c;3D模型和渲染技术在各个领域中扮演着至关重要的角色&#xff0c;从影视制作到建筑设计&#xff0c;从游戏开发到工程模拟。了解如何将3D模型转化为逼真的渲染图像是数字创意领域从业者的关键技能之一。本文将为您介绍从3D模型到渲染的完整流程&…

企业使用合同档案管理系统软件有什么好处

使用合同档案管理系统软件可以带来以下好处&#xff1a; 1. 提高效率&#xff1a;合同管理软件可以自动化合同流程&#xff0c;包括创建、审批、签署和归档等。通过自动化&#xff0c;可以节省大量时间和精力&#xff0c;提高工作效率。 2. 降低风险&#xff1a;玖拓档案合同管…

LVS + Keepalived 高可用群集

一、准备环境 主keepalived&#xff1a;172.168.1.11 lvs 备keepalived&#xff1a;172.168.1.12 lvs web1&#xff1a;172.168.1.13 web2&#xff1a;172.168.1.14 vip&#xff1a;172.168.1.100 客户机访问 关闭防火墙 二、配置 主keepalived 服务器 1. 安装…

给返修就能炫耀几年?16.4的高分,指标优秀,行业公认top 1顶刊!

本周投稿推荐 SSCI • 2区社科类&#xff0c;3.0-4.0&#xff08;社科均可&#xff09; EI • 计算机工程类&#xff08;接收广&#xff0c;录用极快&#xff09; SCI&EI • 4区生物医学类&#xff0c;1.5-2.0&#xff08;录用率99%&#xff09; • 1区工程类&#…

深入学习指针2

前言 hello,我又来了&#xff0c;今天有我继续带领大家深入的学习指针&#xff0c;通过上次的学习&#xff0c;我们已经了解到了指针的基本概念&#xff0c;指针如何使用&#xff0c;指针使用的益处&#xff0c;以及一些相关的概念&#xff0c;那今天我们就继续深入的学习&am…

JDK的串行收集器介绍与优化指南-01

JDK串行收集器概述 定义与背景 串行收集器&#xff08;Serial Collector&#xff09;是Java虚拟机&#xff08;JVM&#xff09;中的一种单线程垃圾收集器&#xff0c;它在垃圾收集过程中会暂停所有工作线程&#xff0c;直至收集完成。它适用于内存资源受限、对吞吐量要求不高…

MySQL 大量数据插入优化

效率最好的方式是&#xff1a;批量插入 开启事务。 1、数据批量插入相比数据逐条插入的运行效率得到极大提升&#xff1b; ## 批量插入 INSERT INTO table (field1, field12,...) VALUES (valuea1, valuea2,...), (valueb1, valueb2,...),...;当数据逐条插入时&#xff0c;每…

OFDM802.11a的FPGA实现(十四)data域的设计优化,挤掉axi协议传输中的气泡

原文链接&#xff08;相关文章合集&#xff09;&#xff1a;OFDM 802.11a的xilinx FPGA实现 目录 1.前言 2.data域的时序要求 3.Debug 1.前言 前面12篇文章详细讲述了&#xff0c;OFDM 802.11a发射部分data域的FPGA实现和验证&#xff0c;今天对data域的设计做一个总结。在…

升级! 测试萌新Python学习之连通数据库Pymsql增删改及封装(四)

pymysql 数据库概述python对数据库的增删改查pymysql核心操作事务事务操作pymysql工具类封装每日复习ChatGPT的回答 数据库概述 分类 关系型数据库: 安全 如, mysql oracle SQLite…database tables 行列 非关系型数据库: 高效 如, redis mongoDB…数据存储结构多样 键值对…